diff --git a/.goreleaser.yml b/.goreleaser.yml index b6ae41e..0aa3b90 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -22,9 +22,8 @@ changelog: - '^docs:' - '^test:' dockers: - - image: cycoresystems/dispatchers - tag_templates: - - "{{ .Tag }}" - - "v{{ .Major }}" - - "v{{ .Major }}.{{ .Minor }}" - - latest + - image_templates: + - "cycoresystems/dispatchers:{{ .Tag }}" + - "cycoresystems/dispatchers:v{{ .Major }}" + - "cycoresystems/dispatchers:v{{ .Major }}.{{ .Minor }}" + - cycoresystems/dispatchers:latest diff --git a/.travis.yml b/.travis.yml index faca80b..f2c50e9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,12 @@ language: go +env: + - GO111MODULE=on go: - - "1.11" + - "1.13" install: - - go get -u github.com/golang/dep/cmd/dep/... - - curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | bash -s -- -b $GOPATH/bin v1.15.0 - - curl -sfL https://github.com/goreleaser/goreleaser/releases/download/v0.101.0/goreleaser_Linux_x86_64.tar.gz | tar xfz - -C $GOPATH/bin goreleaser + - go mod tidy + - curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | bash -s -- -b $GOPATH/bin v1.21.0 + - curl -sfL https://github.com/goreleaser/goreleaser/releases/download/v0.123.3/goreleaser_Linux_x86_64.tar.gz | tar xfz - -C $GOPATH/bin goreleaser script: bash ci_check.sh services: - docker diff --git a/Gopkg.lock b/Gopkg.lock deleted file mode 100644 index 0da9f45..0000000 --- a/Gopkg.lock +++ /dev/null @@ -1,110 +0,0 @@ -# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. - - -[[projects]] - digest = "1:c58551e8a153b49289735f346342ae350d17449aac3d4fa7fb80d0f642a99496" - name = "github.com/CyCoreSystems/go-kamailio" - packages = ["binrpc"] - pruneopts = "" - revision = "8c2e3bc5eac9eef2e8ca04546393fd8153760e84" - version = "v0.1.0" - -[[projects]] - digest = "1:68b057539f657c23d72f1a899371bcadbc3c5d787c3f88172193ee21cb0c75e4" - name = "github.com/ericchiang/k8s" - packages = [ - ".", - "apis/apiextensions/v1beta1", - "apis/core/v1", - "apis/meta/v1", - "apis/resource", - "runtime", - "runtime/schema", - "util/intstr", - "watch/versioned", - ] - pruneopts = "" - revision = "677cf3318ef83bf681a38821f81a233a9be09641" - version = "v1.1.0" - -[[projects]] - digest = "1:b13707423743d41665fd23f0c36b2f37bb49c30e94adb813319c44188a51ba22" - name = "github.com/ghodss/yaml" - packages = ["."] - pruneopts = "" - revision = "0ca9ea5df5451ffdf184b4428c902747c2c11cd7" - version = "v1.0.0" - -[[projects]] - digest = "1:f958a1c137db276e52f0b50efee41a1a389dcdded59a69711f3e872757dab34b" - name = "github.com/golang/protobuf" - packages = ["proto"] - pruneopts = "" - revision = "b4deda0973fb4c70b50d226b1af49f3da59f5265" - version = "v1.1.0" - -[[projects]] - digest = "1:7365acd48986e205ccb8652cc746f09c8b7876030d53710ea6ef7d0bd0dcd7ca" - name = "github.com/pkg/errors" - packages = ["."] - pruneopts = "" - revision = "645ef00459ed84a119197bfb8d8205042c6df63d" - version = "v0.8.0" - -[[projects]] - branch = "master" - digest = "1:b03aa98aca70c77c167d79a5223443d4faba32fd933efe94d3d58c9a30738159" - name = "golang.org/x/net" - packages = [ - "http/httpguts", - "http2", - "http2/hpack", - "idna", - "lex/httplex", - ] - pruneopts = "" - revision = "640f4622ab692b87c2f3a94265e6f579fe38263d" - -[[projects]] - digest = "1:5acd3512b047305d49e8763eef7ba423901e85d5dd2fd1e71778a0ea8de10bd4" - name = "golang.org/x/text" - packages = [ - "collate", - "collate/build", - "internal/colltab", - "internal/gen", - "internal/tag", - "internal/triegen", - "internal/ucd", - "language", - "secure/bidirule", - "transform", - "unicode/bidi", - "unicode/cldr", - "unicode/norm", - "unicode/rangetable", - ] - pruneopts = "" - revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0" - version = "v0.3.0" - -[[projects]] - digest = "1:f0620375dd1f6251d9973b5f2596228cc8042e887cd7f827e4220bc1ce8c30e2" - name = "gopkg.in/yaml.v2" - packages = ["."] - pruneopts = "" - revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183" - version = "v2.2.1" - -[solve-meta] - analyzer-name = "dep" - analyzer-version = 1 - input-imports = [ - "github.com/CyCoreSystems/go-kamailio/binrpc", - "github.com/ericchiang/k8s", - "github.com/ericchiang/k8s/apis/core/v1", - "github.com/ghodss/yaml", - "github.com/pkg/errors", - ] - solver-name = "gps-cdcl" - solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml deleted file mode 100644 index 3c3f0c2..0000000 --- a/Gopkg.toml +++ /dev/null @@ -1,38 +0,0 @@ - -# Gopkg.toml example -# -# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md -# for detailed Gopkg.toml documentation. -# -# required = ["github.com/user/thing/cmd/thing"] -# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] -# -# [[constraint]] -# name = "github.com/user/project" -# version = "1.0.0" -# -# [[constraint]] -# name = "github.com/user/project2" -# branch = "dev" -# source = "github.com/myfork/project2" -# -# [[override]] -# name = "github.com/x/y" -# version = "2.4.0" - - -[[constraint]] - name = "github.com/CyCoreSystems/go-kamailio" - version = "0.1.0" - -[[constraint]] - name = "github.com/ericchiang/k8s" - version = "1.1.0" - -[[constraint]] - name = "github.com/ghodss/yaml" - version = "1.0.0" - -[[constraint]] - name = "github.com/pkg/errors" - version = "0.8.0" diff --git a/ci_check.sh b/ci_check.sh index b33a9c9..c2934bf 100755 --- a/ci_check.sh +++ b/ci_check.sh @@ -1,5 +1,4 @@ #!/bin/bash -e -dep ensure golangci-lint run go test ./... go build ./... diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..086eb8b --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module github.com/CyCoreSystems/dispatchers + +go 1.12 + +require ( + github.com/CyCoreSystems/go-kamailio v0.2.0 + github.com/ericchiang/k8s v1.1.0 + github.com/ghodss/yaml v1.0.0 + github.com/golang/protobuf v1.1.0 // indirect + github.com/pkg/errors v0.8.0 + golang.org/x/net v0.0.0-20180502164142-640f4622ab69 // indirect + golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect + golang.org/x/text v0.3.0 // indirect + gopkg.in/yaml.v2 v2.2.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..54bfbe6 --- /dev/null +++ b/go.sum @@ -0,0 +1,20 @@ +github.com/CyCoreSystems/go-kamailio v0.2.0 h1:5B07UodsIQLI3OZ+CtSJX8yaetLI1C0vUNHmGymD42w= +github.com/CyCoreSystems/go-kamailio v0.2.0/go.mod h1:oTRjwxfuRAZWbDbXRkUIBGIhd4bvu/H5Nkr9ozJZVSc= +github.com/ericchiang/k8s v1.1.0 h1:XjBbrZhlvos0PtQrvvSIPAeinnrYM4c/QKB0CWfnoJU= +github.com/ericchiang/k8s v1.1.0/go.mod h1:/OmBgSq2cd9IANnsGHGlEz27nwMZV2YxlpXuQtU3Bz4= +github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/golang/protobuf v1.1.0 h1:0iH4Ffd/meGoXqF2lSAhZHt8X+cPgkfn/cb6Cce5Vpc= +github.com/golang/protobuf v1.1.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +golang.org/x/net v0.0.0-20180502164142-640f4622ab69 h1:+Ybm3UzSfPpp+Hlr62ZTCtbC9DmCKX61f0r74+peGts= +golang.org/x/net v0.0.0-20180502164142-640f4622ab69/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/kamailio/kamailio.go b/kamailio/kamailio.go deleted file mode 100644 index c2f662d..0000000 --- a/kamailio/kamailio.go +++ /dev/null @@ -1,48 +0,0 @@ -package kamailio - -import ( - "io" - "net" - - "github.com/CyCoreSystems/go-kamailio/binrpc" - "github.com/pkg/errors" -) - -type binRPCClientCodec struct { - c io.ReadWriteCloser -} - -func (c *binRPCClientCodec) ReadResponseBody(body interface{}) error { - return nil -} - -func (c *binRPCClientCodec) WriteRequest(name string) error { - var methodName = binrpc.BinRpcString(name) - return methodName.Encode(c.c) -} - -func newClientCodec(conn io.ReadWriteCloser) *binRPCClientCodec { - return &binRPCClientCodec{ - c: conn, - } -} - -// InvokeMethod calls the given RPC method on the given host and port -func InvokeMethod(method string, host string, port string) error { - - conn, err := net.Dial("udp", host+":"+port) - defer conn.Close() // nolint - - if err != nil { - return errors.Wrap(err, "failed to connect to kamailio RPC server") - } - - codec := newClientCodec(conn) - err = codec.WriteRequest(method) - - if err != nil { - return errors.Wrap(err, "failed to invoke RPC method") - } - - return nil -} diff --git a/main.go b/main.go index fe55857..54ed620 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ import ( "io" "io/ioutil" "log" + "net/http" "os" "os/signal" "strconv" @@ -14,8 +15,8 @@ import ( "syscall" "time" - "github.com/CyCoreSystems/dispatchers/kamailio" "github.com/CyCoreSystems/dispatchers/sets" + "github.com/CyCoreSystems/go-kamailio/binrpc" "github.com/ericchiang/k8s" "github.com/ghodss/yaml" @@ -30,6 +31,8 @@ var kubeCfg string var maxShortDeaths = 10 var minRuntime = time.Minute +var apiAddr string + // KamailioStartupDebounceTimer is the amount of time to wait on startup to // send an additional notify to kamailio. // @@ -47,6 +50,7 @@ func init() { flag.StringVar(&rpcHost, "h", "127.0.0.1", "Host for kamailio's RPC service") flag.StringVar(&rpcPort, "p", "9998", "Port for kamailio's RPC service") flag.StringVar(&kubeCfg, "kubecfg", "", "Location of kubecfg file (if not running inside k8s)") + flag.StringVar(&apiAddr, "api", "", "Address on which to run web API service. Example ':8080'. (defaults to not run)") } // SetDefinition describes a kubernetes dispatcher set's parameters @@ -92,7 +96,6 @@ func (s *SetDefinition) String() string { // Set configures a kubernetes-derived dispatcher set func (s *SetDefinition) Set(raw string) (err error) { - // Handle multiple comma-delimited arguments if strings.Contains(raw, ",") { args := strings.Split(raw, ",") @@ -105,9 +108,9 @@ func (s *SetDefinition) Set(raw string) (err error) { } var id int - var ns = "default" + ns := "default" var name string - var port = "5060" + port := "5060" if os.Getenv("POD_NAMESPACE") != "" { ns = os.Getenv("POD_NAMESPACE") @@ -118,7 +121,7 @@ func (s *SetDefinition) Set(raw string) (err error) { return fmt.Errorf("failed to parse %s as the form [namespace:]name=index", raw) } - var naming = strings.SplitN(pieces[0], ":", 2) + naming := strings.SplitN(pieces[0], ":", 2) if len(naming) < 2 { name = naming[0] } else { @@ -126,7 +129,7 @@ func (s *SetDefinition) Set(raw string) (err error) { name = naming[1] } - var idString = pieces[1] + idString := pieces[1] if pieces = strings.Split(pieces[1], ":"); len(pieces) > 1 { idString = pieces[0] port = pieces[1] @@ -156,7 +159,6 @@ type dispatcherSets struct { // add creates a dispatcher set from a k8s set definition func (s *dispatcherSets) add(ctx context.Context, args *SetDefinition) error { - ds, err := sets.NewKubernetesSet(ctx, s.kc, args.id, args.namespace, args.name, args.port) if err != nil { return errors.Wrap(err, "failed to create kubernetes-based dispatcher set") @@ -174,7 +176,6 @@ func (s *dispatcherSets) add(ctx context.Context, args *SetDefinition) error { // export dumps the output from all dispatcher sets func (s *dispatcherSets) export() error { - f, err := os.Create(s.outputFilename) if err != nil { return errors.Wrap(err, "failed to open dispatchers file for writing") @@ -239,9 +240,47 @@ func (s *dispatcherSets) maintain(ctx context.Context) error { return ctx.Err() } +// ServeHTTP offers a web service by which clients may validate membership of an IP address within a dispatcher set +func (s *dispatcherSets) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Handle requests for /check// to validate membership of an IP to a dispatcher set + if strings.HasPrefix(r.URL.Path, "/check/") { + pieces := strings.Split(r.URL.Path, "/") + if len(pieces) != 3 { + w.WriteHeader(http.StatusBadRequest) + return + } + setID, err := strconv.Atoi(pieces[1]) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + if s.validateSetMember(setID, pieces[2]) { + w.WriteHeader(http.StatusOK) + return + } + w.WriteHeader(http.StatusNotFound) + return + } + + w.WriteHeader(http.StatusNotFound) +} + +func (s *dispatcherSets) validateSetMember(id int, addr string) bool { + selectedSet, ok := s.sets[id] + if !ok { + return false + } + for _, ref := range selectedSet.Hosts() { + if ref == addr { + return true + } + } + return false +} + // notify signals to kamailio to reload its dispatcher list func (s *dispatcherSets) notify() error { - return kamailio.InvokeMethod("dispatcher.reload", s.rpcHost, s.rpcPort) + return binrpc.InvokeMethod("dispatcher.reload", s.rpcHost, s.rpcPort) } func main() { @@ -276,7 +315,7 @@ func run() error { os.Exit(1) } - var s = &dispatcherSets{ + s := &dispatcherSets{ kc: kc, outputFilename: outputFilename, rpcHost: rpcHost, @@ -312,6 +351,22 @@ func run() error { } }) + // Run a web service to offer IP checks for each member of the dispatcher set + if apiAddr != "" { + var srv http.Server + go func() { + <-ctx.Done() + if err := srv.Shutdown(ctx); err != nil { + log.Fatalln("failed to shut down HTTP server:", err) + } + }() + go func() { + if err := srv.ListenAndServe(); err != http.ErrServerClosed { + log.Fatalln("failed to start HTTP server:", err) + } + }() + } + for ctx.Err() == nil { err = s.maintain(ctx) if errors.Cause(err) == io.EOF { diff --git a/sets/sets.go b/sets/sets.go index 5646b45..deafd4e 100644 --- a/sets/sets.go +++ b/sets/sets.go @@ -22,6 +22,9 @@ type DispatcherSet interface { // ID returns the dispatcher set ID ID() int + // Hosts returns the set addresses of the members of the dispatcher set + Hosts() []string + // Export dumps the dispatcher set Export() string @@ -44,8 +47,12 @@ func (s *staticSet) ID() int { return s.id } +func (s *staticSet) Hosts() []string { + return s.Members +} + func (s *staticSet) Export() string { - var ret = fmt.Sprintf("# Dispatcher set %d\n", s.id) + ret := fmt.Sprintf("# Dispatcher set %d\n", s.id) for _, m := range s.Members { ret += fmt.Sprintf("%d sip:%s", s.id, m) @@ -144,8 +151,12 @@ func (s *kubernetesSet) ID() int { return s.id } +func (s *kubernetesSet) Hosts() []string { + return s.members +} + func (s *kubernetesSet) Export() string { - var ret = fmt.Sprintf("# Dispatcher set %d\n", s.id) + ret := fmt.Sprintf("# Dispatcher set %d\n", s.id) for _, m := range s.members { ret += fmt.Sprintf("%d sip:%s:%s\n", s.id, m, s.port) @@ -170,7 +181,6 @@ func (s *kubernetesSet) Update(ctx context.Context) (changed bool, err error) { } func (s *kubernetesSet) Watch(ctx context.Context) (string, error) { - for ctx.Err() == nil { select { case err := <-s.changes: