diff --git a/check-licenses/check_licenses.go b/check-licenses/check_licenses.go index f71b9764..8bfb11f9 100644 --- a/check-licenses/check_licenses.go +++ b/check-licenses/check_licenses.go @@ -23,8 +23,8 @@ import ( "regexp" "strings" + "github.com/projectcalico/libcalico-go/lib/set" "github.com/projectcalico/typha/pkg/logutils" - "github.com/projectcalico/typha/pkg/set" ) var ( diff --git a/glide.lock b/glide.lock index 269fe680..5b9df276 100644 --- a/glide.lock +++ b/glide.lock @@ -1,12 +1,12 @@ -hash: 1847cdf106df6e8085d241e5910354da8346074adbb0dda7033ad4708701a92e -updated: 2017-07-19T10:15:13.551306815Z +hash: bf35a63bb5c663298d6bc89a2682099a7d0abbc895f71d84a4d76d55e009e3de +updated: 2017-07-20T08:42:22.874117758Z imports: - name: github.com/beorn7/perks version: 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9 subpackages: - quantile - name: github.com/coreos/etcd - version: ae23b0ef2f1f6708f9a34d287a28ab02767c2a16 + version: c31bec0f29facff13f7c3e3d948e55dd6689ed42 subpackages: - client - pkg/pathutil @@ -114,7 +114,7 @@ imports: - name: github.com/projectcalico/go-yaml-wrapper version: 598e54215bee41a19677faa4f0c32acd2a87eb56 - name: github.com/projectcalico/libcalico-go - version: 639aa7b2d33b94419143a2e6987d2d77aa395f2f + version: e1ab5e5bf4a57ea6fa71bf4a7712af27b824a005 subpackages: - lib - lib/api @@ -131,6 +131,7 @@ imports: - lib/converter - lib/errors - lib/hash + - lib/health - lib/hwm - lib/ipip - lib/net @@ -139,6 +140,7 @@ imports: - lib/selector - lib/selector/parser - lib/selector/tokenizer + - lib/set - lib/testutils - lib/validator - name: github.com/prometheus/client_golang @@ -194,7 +196,7 @@ imports: - idna/ - lex/httplex - name: golang.org/x/sys - version: cd2c276457edda6df7fb04895d3fd6a6add42926 + version: 7a4fde3fda8ef580a89dbae8138c26041be14299 subpackages: - unix - name: golang.org/x/text diff --git a/glide.yaml b/glide.yaml index ace7d439..3cfc0ec9 100644 --- a/glide.yaml +++ b/glide.yaml @@ -27,7 +27,7 @@ import: - package: github.com/go-ini/ini version: ^1.21.0 - package: github.com/projectcalico/libcalico-go - version: 639aa7b2d33b94419143a2e6987d2d77aa395f2f + version: e1ab5e5bf4a57ea6fa71bf4a7712af27b824a005 subpackages: - lib - package: github.com/Sirupsen/logrus @@ -58,4 +58,3 @@ import: version: ^0.3.0 testImport: - package: github.com/onsi/gomega - version: 9b8c753e8dfb382618ba8fa19b4197b5dcb0434c diff --git a/pkg/config/config_params.go b/pkg/config/config_params.go index 57bba813..d0652b78 100644 --- a/pkg/config/config_params.go +++ b/pkg/config/config_params.go @@ -103,6 +103,8 @@ type Config struct { LogSeverityScreen string `config:"oneof(DEBUG,INFO,WARNING,ERROR,CRITICAL);INFO"` LogSeveritySys string `config:"oneof(DEBUG,INFO,WARNING,ERROR,CRITICAL);INFO"` + HealthEnabled bool `config:"bool;false"` + HealthPort int `config:"int(0,65535);9098"` PrometheusMetricsEnabled bool `config:"bool;false"` PrometheusMetricsPort int `config:"int(0,65535);9093"` PrometheusGoMetricsEnabled bool `config:"bool;true"` diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index feef43c1..4bd981c7 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -29,13 +29,14 @@ import ( "time" log "github.com/Sirupsen/logrus" - "github.com/docopt/docopt-go" + docopt "github.com/docopt/docopt-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/projectcalico/libcalico-go/lib/api" "github.com/projectcalico/libcalico-go/lib/backend" bapi "github.com/projectcalico/libcalico-go/lib/backend/api" + "github.com/projectcalico/libcalico-go/lib/health" "github.com/projectcalico/typha/pkg/buildinfo" "github.com/projectcalico/typha/pkg/calc" "github.com/projectcalico/typha/pkg/config" @@ -77,6 +78,9 @@ type TyphaDaemon struct { NewBackendClient func(config api.CalicoAPIConfig) (BackendClient, error) ConfigureEarlyLogging func() ConfigureLogging func(configParams *config.Config) + + // Health monitoring. + healthAggregator *health.HealthAggregator } func New() *TyphaDaemon { @@ -215,6 +219,9 @@ configRetry: func (t *TyphaDaemon) CreateServer() { // Now create the Syncer; our caching layer and the TCP server. + // Health monitoring, for liveness and readiness endpoints. + t.healthAggregator = health.NewHealthAggregator() + // Get a Syncer from the datastore, which will feed the validator layer with updates. t.SyncerToValidator = calc.NewSyncerCallbacksDecoupler() t.Syncer = t.DatastoreClient.Syncer(t.SyncerToValidator) @@ -226,7 +233,8 @@ func (t *TyphaDaemon) CreateServer() { // Create our snapshot cache, which stores point-in-time copies of the datastore contents. t.Cache = snapcache.New(snapcache.Config{ - MaxBatchSize: t.ConfigParams.SnapshotCacheMaxBatchSize, + MaxBatchSize: t.ConfigParams.SnapshotCacheMaxBatchSize, + HealthAggregator: t.healthAggregator, }) // Create the server, which listens for connections from Felix. @@ -241,6 +249,7 @@ func (t *TyphaDaemon) CreateServer() { DropInterval: t.ConfigParams.ConnectionDropIntervalSecs, MaxConns: t.ConfigParams.MaxConnectionsUpperLimit, Port: t.ConfigParams.ServerPort, + HealthAggregator: t.healthAggregator, }, ) } @@ -268,6 +277,11 @@ func (t *TyphaDaemon) Start(cxt context.Context) { log.Info("Prometheus metrics enabled. Starting server.") go servePrometheusMetrics(t.ConfigParams) } + + if t.ConfigParams.HealthEnabled { + log.WithField("port", t.ConfigParams.HealthPort).Info("Health enabled. Starting server.") + go t.healthAggregator.ServeHTTP(t.ConfigParams.HealthPort) + } } // WaitAndShutDown waits for OS signals or context.Done() and exits as appropriate. diff --git a/pkg/k8s/lookup.go b/pkg/k8s/lookup.go index 0c867e8a..cd17d348 100644 --- a/pkg/k8s/lookup.go +++ b/pkg/k8s/lookup.go @@ -20,7 +20,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "github.com/projectcalico/typha/pkg/set" + "github.com/projectcalico/libcalico-go/lib/set" ) func NewK8sAPI() *RealK8sAPI { diff --git a/pkg/set/set.go b/pkg/set/set.go deleted file mode 100644 index 0cdc70a3..00000000 --- a/pkg/set/set.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright (c) 2016-2017 Tigera, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package set - -import ( - "errors" - "reflect" - - log "github.com/Sirupsen/logrus" -) - -type Set interface { - Len() int - Add(interface{}) - AddAll(itemArray interface{}) - Discard(interface{}) - Clear() - Contains(interface{}) bool - Iter(func(item interface{}) error) - Copy() Set - Equals(Set) bool -} - -type empty struct{} - -var emptyValue = empty{} - -var ( - StopIteration = errors.New("Stop iteration") - RemoveItem = errors.New("Remove item") -) - -func New() Set { - return make(mapSet) -} - -func From(members ...interface{}) Set { - s := New() - s.AddAll(members) - return s -} - -func FromArray(membersArray interface{}) Set { - s := New() - s.AddAll(membersArray) - return s -} - -func Empty() Set { - return mapSet(nil) -} - -type mapSet map[interface{}]empty - -func (set mapSet) Len() int { - return len(set) -} - -func (set mapSet) Add(item interface{}) { - set[item] = emptyValue -} - -func (set mapSet) AddAll(itemArray interface{}) { - - arrVal := reflect.ValueOf(itemArray) - for i := 0; i < arrVal.Len(); i++ { - set.Add(arrVal.Index(i).Interface()) - } -} - -func (set mapSet) Discard(item interface{}) { - delete(set, item) -} - -func (set mapSet) Clear() { - for item := range set { - delete(set, item) - } -} - -func (set mapSet) Contains(item interface{}) bool { - _, present := set[item] - return present -} - -func (set mapSet) Iter(visitor func(item interface{}) error) { -loop: - for item := range set { - err := visitor(item) - switch err { - case StopIteration: - break loop - case RemoveItem: - delete(set, item) - case nil: - break - default: - log.WithError(err).Panic("Unexpected iteration error") - } - } -} - -func (set mapSet) Copy() Set { - cpy := New() - for item := range set { - cpy.Add(item) - } - return cpy -} - -func (set mapSet) Equals(other Set) bool { - if set.Len() != other.Len() { - return false - } - for item := range set { - if !other.Contains(item) { - return false - } - } - return true -} diff --git a/pkg/set/set_suite_test.go b/pkg/set/set_suite_test.go deleted file mode 100644 index 072c66c8..00000000 --- a/pkg/set/set_suite_test.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright (c) 2016-2017 Tigera, Inc. All rights reserved. - -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package set_test - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "testing" - - "github.com/projectcalico/libcalico-go/lib/testutils" -) - -func init() { - testutils.HookLogrusForGinkgo() -} - -func TestSet(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Set Suite") -} diff --git a/pkg/set/set_test.go b/pkg/set/set_test.go deleted file mode 100644 index 0994702f..00000000 --- a/pkg/set/set_test.go +++ /dev/null @@ -1,216 +0,0 @@ -// Copyright (c) 2016-2017 Tigera, Inc. All rights reserved. - -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package set_test - -import ( - "github.com/projectcalico/typha/pkg/set" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -var _ = Describe("Set", func() { - var s set.Set - BeforeEach(func() { - s = set.New() - }) - - It("should be empty", func() { - Expect(s.Len()).To(BeZero()) - }) - It("should iterate over no items", func() { - called := false - s.Iter(func(item interface{}) error { - called = true - return nil - }) - Expect(called).To(BeFalse()) - }) - It("should do nothing on clear", func() { - s.Clear() - Expect(s.Len()).To(BeZero()) - }) - - Describe("Set created by FromArray", func() { - BeforeEach(func() { - s = set.FromArray([]int{1, 2}) - }) - It("should contain 1", func() { - Expect(s.Contains(1)).To(BeTrue()) - }) - It("should contain 2", func() { - Expect(s.Contains(2)).To(BeTrue()) - }) - It("should not contain 3", func() { - Expect(s.Contains(3)).To(BeFalse()) - }) - }) - - Describe("Set created by From", func() { - BeforeEach(func() { - s = set.From(1, 2) - }) - It("should contain 1", func() { - Expect(s.Contains(1)).To(BeTrue()) - }) - It("should contain 2", func() { - Expect(s.Contains(2)).To(BeTrue()) - }) - It("should not contain 3", func() { - Expect(s.Contains(3)).To(BeFalse()) - }) - }) - - Describe("after adding 1 and 2", func() { - BeforeEach(func() { - s.Add(1) - s.Add(2) - s.Add(2) // Duplicate should have no effect - }) - It("should contain 1", func() { - Expect(s.Contains(1)).To(BeTrue()) - }) - It("should contain 2", func() { - Expect(s.Contains(2)).To(BeTrue()) - }) - It("should not contain 3", func() { - Expect(s.Contains(3)).To(BeFalse()) - }) - It("should iterate over 1 and 2 in some order", func() { - seen1 := false - seen2 := false - s.Iter(func(item interface{}) error { - if item.(int) == 1 { - Expect(seen1).To(BeFalse()) - seen1 = true - } else if item.(int) == 2 { - Expect(seen2).To(BeFalse()) - seen2 = true - } else { - Fail("Unexpected item") - } - return nil - }) - Expect(seen1).To(BeTrue()) - Expect(seen2).To(BeTrue()) - }) - It("should allow remove during iteration", func() { - s.Iter(func(item interface{}) error { - if item.(int) == 1 { - return set.RemoveItem - } - return nil - }) - Expect(s.Contains(1)).To(BeFalse()) - Expect(s.Contains(2)).To(BeTrue()) - }) - It("should support stopping iteration", func() { - iterationStarted := false - s.Iter(func(item interface{}) error { - if iterationStarted { - Fail("Iteration continued after stop") - } - iterationStarted = true - return set.StopIteration - }) - Expect(s.Contains(1)).To(BeTrue()) - Expect(s.Contains(2)).To(BeTrue()) - }) - It("can copy a Set", func() { - c := s.Copy() - Expect(c.Len()).To(Equal(s.Len())) - Expect(c).NotTo(BeIdenticalTo(s)) // Check they're not the same object. - Expect(c).To(Equal(s)) // DeepEquals, will check the contents. - }) - It("should correctly determine set equality", func() { - c := s.Copy() - Expect(c.Equals(s)).To(BeTrue()) - Expect(s.Equals(c)).To(BeTrue()) - c.Add(3) - Expect(c.Equals(s)).To(BeFalse()) - Expect(s.Equals(c)).To(BeFalse()) - c.Discard(2) - Expect(c.Equals(s)).To(BeFalse()) - Expect(s.Equals(c)).To(BeFalse()) - c.Add(2) - c.Discard(3) - Expect(c.Equals(s)).To(BeTrue()) - Expect(s.Equals(c)).To(BeTrue()) - }) - - Describe("after removing 2", func() { - BeforeEach(func() { - s.Discard(2) - }) - It("should contain 1", func() { - Expect(s.Contains(1)).To(BeTrue()) - }) - It("should not contain 2", func() { - Expect(s.Contains(2)).To(BeFalse()) - }) - It("should not contain 3", func() { - Expect(s.Contains(3)).To(BeFalse()) - }) - }) - Describe("after using AddAll to add 2, 3, 4", func() { - BeforeEach(func() { - s.AddAll([]int{2, 3, 4}) - }) - It("should contain 1", func() { - Expect(s.Contains(1)).To(BeTrue()) - }) - It("should contain 2", func() { - Expect(s.Contains(2)).To(BeTrue()) - }) - It("should contain 3", func() { - Expect(s.Contains(3)).To(BeTrue()) - }) - It("should contain 4", func() { - Expect(s.Contains(4)).To(BeTrue()) - }) - }) - - Describe("after Clear()", func() { - BeforeEach(func() { - s.Clear() - }) - It("should be empty", func() { - Expect(s.Len()).To(BeZero()) - }) - }) - }) -}) - -var _ = Describe("EmptySet", func() { - var empty set.Set - BeforeEach(func() { - empty = set.Empty() - }) - It("has length 0", func() { - Expect(empty.Len()).To(Equal(0)) - }) - It("should panic on add", func() { - Expect(func() { empty.Add("foo") }).To(Panic()) - }) - It("should ignore discard", func() { - Expect(func() { empty.Discard("foo") }).NotTo(Panic()) - }) - It("should iterate 0 times", func() { - empty.Iter(func(item interface{}) error { - Fail("Iterated > 0 times") - return nil - }) - }) -}) diff --git a/pkg/snapcache/cache.go b/pkg/snapcache/cache.go index ded77484..6d82adf2 100644 --- a/pkg/snapcache/cache.go +++ b/pkg/snapcache/cache.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/projectcalico/libcalico-go/lib/backend/api" + "github.com/projectcalico/libcalico-go/lib/health" "github.com/projectcalico/typha/pkg/jitter" "github.com/projectcalico/typha/pkg/syncproto" ) @@ -125,11 +126,18 @@ type Cache struct { currentBreadcrumb unsafe.Pointer wakeUpTicker *jitter.Ticker + healthTicks <-chan time.Time } +const ( + healthName = "cache" + healthInterval = 10 * time.Second +) + type Config struct { - MaxBatchSize int - WakeUpInterval time.Duration + MaxBatchSize int + WakeUpInterval time.Duration + HealthAggregator *health.HealthAggregator } func (config *Config) ApplyDefaults() { @@ -158,14 +166,20 @@ func New(config Config) *Cache { nextCond: cond, KVs: kvs.ReadOnlySnapshot(), } - return &Cache{ + c := &Cache{ config: config, inputC: make(chan interface{}, config.MaxBatchSize*2), breadcrumbCond: cond, kvs: kvs, currentBreadcrumb: (unsafe.Pointer)(snap), wakeUpTicker: jitter.NewTicker(config.WakeUpInterval, config.WakeUpInterval/10), + healthTicks: time.NewTicker(healthInterval).C, + } + if config.HealthAggregator != nil { + config.HealthAggregator.RegisterReporter(healthName, &health.HealthReport{Live: true, Ready: true}, healthInterval*2) } + c.reportHealth() + return c } // CurrentBreadcrumb returns the current Breadcrumb, which contains a snapshot of the datastore @@ -252,10 +266,21 @@ func (c *Cache) fillBatchFromInputQueue(ctx context.Context) error { // wake all the clients so they can check if their Context is done. log.Debug("Waking all clients.") c.breadcrumbCond.Broadcast() + case <-c.healthTicks: + c.reportHealth() } return ctx.Err() } +func (c *Cache) reportHealth() { + if c.config.HealthAggregator != nil { + c.config.HealthAggregator.Report(healthName, &health.HealthReport{ + Live: true, + Ready: c.pendingStatus == api.InSync, + }) + } +} + // publishBreadcrumbs sends a series of Breadcrumbs, draining the pending updates list. func (c *Cache) publishBreadcrumbs() { for { diff --git a/pkg/syncserver/sync_server.go b/pkg/syncserver/sync_server.go index bbb53df2..6e8d415f 100644 --- a/pkg/syncserver/sync_server.go +++ b/pkg/syncserver/sync_server.go @@ -28,6 +28,7 @@ import ( "math" "github.com/projectcalico/libcalico-go/lib/backend/api" + "github.com/projectcalico/libcalico-go/lib/health" "github.com/projectcalico/typha/pkg/buildinfo" "github.com/projectcalico/typha/pkg/jitter" "github.com/projectcalico/typha/pkg/snapcache" @@ -132,8 +133,14 @@ type Config struct { PongTimeout time.Duration DropInterval time.Duration MaxConns int + HealthAggregator *health.HealthAggregator } +const ( + healthName = "sync_server" + healthInterval = 10 * time.Second +) + func (c *Config) ApplyDefaults() { if c.MaxMessageSize < 1 { log.WithFields(log.Fields{ @@ -205,7 +212,7 @@ func (c *Config) ListenPort() int { func New(cache BreadcrumbProvider, config Config) *Server { config.ApplyDefaults() log.WithField("config", config).Info("Creating server") - return &Server{ + s := &Server{ config: config, cache: cache, maxConnsC: make(chan int), @@ -214,6 +221,17 @@ func New(cache BreadcrumbProvider, config Config) *Server { connIDToConn: map[uint64]*connection{}, listeningC: make(chan struct{}), } + + // Register that we will report liveness. + if config.HealthAggregator != nil { + config.HealthAggregator.RegisterReporter( + healthName, + &health.HealthReport{Live: true}, + healthInterval*2, + ) + } + + return s } func (s *Server) Start(cxt context.Context) { @@ -333,6 +351,8 @@ func (s *Server) governNumberOfConnections(cxt context.Context) { logCxt := log.WithField("thread", "numConnsGov") maxConns := s.maxConns ticker := jitter.NewTicker(s.dropInterval, s.dropInterval/10) + healthTicks := time.NewTicker(healthInterval).C + s.reportHealth() for { select { case newMax := <-s.maxConnsC: @@ -370,10 +390,18 @@ func (s *Server) governNumberOfConnections(cxt context.Context) { case <-cxt.Done(): logCxt.Info("Context asked us to stop") return + case <-healthTicks: + s.reportHealth() } } } +func (s *Server) reportHealth() { + if s.config.HealthAggregator != nil { + s.config.HealthAggregator.Report(healthName, &health.HealthReport{Live: true}) + } +} + type connection struct { ID uint64 config *Config