diff --git a/main.go b/main.go index 5c7b5a49a..5b4074037 100644 --- a/main.go +++ b/main.go @@ -129,7 +129,7 @@ func setupTriggers(ctx context.Context, k8sImplementer kubernetes.Implementer, p return } - ps, err := pubsub.NewSubscriber(&pubsub.Opts{ + ps, err := pubsub.NewPubsubSubscriber(&pubsub.Opts{ ProjectID: projectID, Providers: providers, }) diff --git a/provider/kubernetes/kubernetes.go b/provider/kubernetes/kubernetes.go index 3332c9db4..a012f0fe4 100644 --- a/provider/kubernetes/kubernetes.go +++ b/provider/kubernetes/kubernetes.go @@ -118,8 +118,6 @@ func (p *Provider) updateDeployments(deployments []v1beta1.Deployment) (updated // getDeployment - helper function to get specific deployment func (p *Provider) getDeployment(namespace, name string) (*v1beta1.Deployment, error) { - // dep := p.client.Extensions().Deployments(namespace) - // return dep.Get(name, meta_v1.GetOptions{}) return p.implementer.Deployment(namespace, name) } @@ -157,6 +155,8 @@ func (p *Provider) impactedDeployments(repo *types.Repository) ([]v1beta1.Deploy "policy": policy, }).Info("provider.kubernetes: keel policy found, checking deployment...") + shouldUpdateDeployment := false + for idx, c := range deployment.Spec.Template.Spec.Containers { // Remove version if any containerImageName := versionreg.ReplaceAllString(c.Image, "") @@ -194,7 +194,7 @@ func (p *Provider) impactedDeployments(repo *types.Repository) ([]v1beta1.Deploy "policy": policy, }).Info("provider.kubernetes: current image version") - shouldUpdate, err := version.ShouldUpdate(currentVersion, newVersion, policy) + shouldUpdateContainer, err := version.ShouldUpdate(currentVersion, newVersion, policy) if err != nil { log.WithFields(log.Fields{ "error": err, @@ -213,14 +213,16 @@ func (p *Provider) impactedDeployments(repo *types.Repository) ([]v1beta1.Deploy "current_version": currentVersion.String(), "new_version": newVersion.String(), "policy": policy, - "should_update": shouldUpdate, + "should_update": shouldUpdateContainer, }).Info("provider.kubernetes: checked version, deciding whether to update") - if shouldUpdate { + if shouldUpdateContainer { // updating image c.Image = fmt.Sprintf("%s:%s", containerImageName, newVersion.String()) deployment.Spec.Template.Spec.Containers[idx] = c - impacted = append(impacted, deployment) + // marking this deployment for update + shouldUpdateDeployment = true + log.WithFields(log.Fields{ "parsed_image": containerImageName, "raw_image_name": c.Image, @@ -230,6 +232,10 @@ func (p *Provider) impactedDeployments(repo *types.Repository) ([]v1beta1.Deploy }).Info("provider.kubernetes: impacted deployment container found") } } + + if shouldUpdateDeployment { + impacted = append(impacted, deployment) + } } } diff --git a/provider/kubernetes/kubernetes_test.go b/provider/kubernetes/kubernetes_test.go index 1f0e6113f..fa1c75e6f 100644 --- a/provider/kubernetes/kubernetes_test.go +++ b/provider/kubernetes/kubernetes_test.go @@ -398,9 +398,6 @@ func TestGetImpactedTwoContainersInSameDeployment(t *testing.T) { func TestGetImpactedTwoSameContainersInSameDeployment(t *testing.T) { - // FIXME: enable once sorted - t.Skip() - fp := &fakeImplementer{} fp.namespaces = &v1.NamespaceList{ Items: []v1.Namespace{ diff --git a/readme.md b/readme.md index 1e83d1136..d6ad5311a 100644 --- a/readme.md +++ b/readme.md @@ -1,6 +1,6 @@ # Keel - automated Kubernetes deployments for the rest of us -Lightweight (11MB image size, uses 12MB RAM when running) [Kubernetes](https://kubernetes.io/) controller for automating image updates for deployments. Keel uses [semantic versioning](http://semver.org/) to determine whether deployment needs an update or not. Currently keel has several types of triggers: +Lightweight (uses ~10MB RAM when running) [Kubernetes](https://kubernetes.io/) controller for automating deployment updates when new image is available. Keel uses [semantic versioning](http://semver.org/) to determine whether deployment needs an update or not. Currently keel has several types of triggers: * Google's pubsub integration with [Google Container Registry](https://cloud.google.com/container-registry/) * Webhooks @@ -9,20 +9,22 @@ Upcomming integrations: * DockerHub webhooks -## Why? -I have built Keel since I have a relatively small Golang project which doesn't use a lot of memory and introducing an antique, heavy weight CI solution with lots dependencies seemed like a terrible idea. +## Keel overview -You should consider using Keel: -* If you are not Netflix, Google, Amazon, {insert big company here} - you might not want to run something like Spinnaker that has heavy dependencies such as "JDK8, Redis, Cassandra, Packer". You probably need something lightweight, stateless, that you don't have to think about. -* If you are not a bank that uses RedHat's OpenShift which embedded Jenkins that probably already does what Keel is doing. -* You want automated Kubernetes deployment updates. +* Stateless, runs as a single container in kube-system namespace +* Automatically detects images that you have in your Kubernetes environment and configures relevant [Google Cloud pubsub](https://cloud.google.com/pubsub/) topic, subscriptions. +* Updates deployment if you have set Keel policy and newer image is available. -Here is a list of Keel dependencies: +## Why? -1. +I have built Keel since I have a relatively small Golang project which doesn't use a lot of memory and introducing an antique, heavy weight CI solution with lots dependencies seemed like a terrible idea. + +You should consider using Keel if: +* You don't want your "Continous Delivery" tool to consume more resources than your actual deployment does. +* You are __not__ Netflix, Google, Amazon, {insert big company here} that already has something like Spinnaker that has too many dependencies such as "JDK8, Redis, Cassandra, Packer". +* You want simple, automated Kubernetes deployment updates. -Yes, none. ## Getting started @@ -70,10 +72,10 @@ spec: Available policy options: -* all - update whenever there is a version bump -* major - update major versions -* minor - update only minor versions (ignores major) -* patch - update only patch versions (ignores minor and major versions) +* __all__ - update whenever there is a version bump +* __major__ - update major versions +* __minor__ - update only minor versions (ignores major) +* __patch__ - update only patch versions (ignores minor and major versions) ## Deployment @@ -87,17 +89,15 @@ gcloud container node-pools create new-pool --cluster CLUSTER_NAME --scopes http ### Step 2: Kubernetes -Since keel will be updating deployments, let's create a new [service account](https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/) in `kube-system` namespace: +Keel will be updating deployments, let's create a new [service account](https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/) in `kube-system` namespace: ``` kubectl create serviceaccount keel --namespace=kube-system ``` - -Now, edit [deployment file](https://github.com/rusenask/keel/blob/master/hack/deployment.sample.yml) that is supplied with the repo (basically point to the newest keel release and set your PROJECT_ID to the actual project ID that you have): +Now, edit [deployment file](https://github.com/rusenask/keel/blob/master/hack/deployment.sample.yml) that is supplied with the repository (basically point to the [newest Keel release](https://hub.docker.com/r/karolisr/keel/tags/) and set your PROJECT_ID to the actual project ID that you have): ``` kubectl create -f hack/deployment.yml ``` -Once Keel is deployed in your Kubernetes cluster - it occasionally scans your current deployments and looks for ones that have label _keel.observer/policy_. It then checks whether appropriate subscriptions and topics are set for GCR registries, if not - auto-creates them. - +Once Keel is deployed in your Kubernetes cluster - it occasionally scans your current deployments and looks for ones that have label _keel.observer/policy_. It then checks whether appropriate subscriptions and topics are set for GCR registries, if not - auto-creates them. \ No newline at end of file diff --git a/trigger/pubsub/manager.go b/trigger/pubsub/manager.go index 03723b041..094a522b9 100644 --- a/trigger/pubsub/manager.go +++ b/trigger/pubsub/manager.go @@ -18,7 +18,7 @@ import ( type DefaultManager struct { implementer kubernetes.Implementer - client *Subscriber + client Subscriber // existing subscribers mu *sync.Mutex // a map of GCR URIs and subscribers to those URIs @@ -35,8 +35,12 @@ type DefaultManager struct { ctx context.Context } +type Subscriber interface { + Subscribe(ctx context.Context, topic, subscription string) error +} + // NewDefaultManager - creates new pubsub manager to create subscription for deployments -func NewDefaultManager(projectID string, implementer kubernetes.Implementer, subClient *Subscriber) *DefaultManager { +func NewDefaultManager(projectID string, implementer kubernetes.Implementer, subClient Subscriber) *DefaultManager { return &DefaultManager{ implementer: implementer, client: subClient, @@ -65,7 +69,7 @@ func (s *DefaultManager) Start(ctx context.Context) error { case <-ctx.Done(): return nil default: - log.Info("performing scan") + log.Debug("performing scan") err := s.scan(ctx) if err != nil { log.WithFields(log.Fields{ @@ -114,10 +118,31 @@ func (s *DefaultManager) subscribed(gcrURI string) bool { return ok } -func (s *DefaultManager) addSubscription(ctx context.Context, gcrURI string) { +func (s *DefaultManager) ensureSubscription(gcrURI string) { s.mu.Lock() defer s.mu.Unlock() - s.subscribers[gcrURI] = ctx + + _, ok := s.subscribers[gcrURI] + if !ok { + ctx, cancel := context.WithCancel(s.ctx) + s.subscribers[gcrURI] = ctx + subName := containerRegistrySubName(s.projectID, gcrURI) + go func() { + defer cancel() + err := s.client.Subscribe(s.ctx, gcrURI, subName) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "gcr_uri": gcrURI, + "subscription_name": subName, + }).Error("trigger.pubsub.manager: failed to create subscription") + } + + // cleanup + s.removeSubscription(gcrURI) + + }() + } } func (s *DefaultManager) removeSubscription(gcrURI string) { @@ -141,34 +166,7 @@ func (s *DefaultManager) checkDeployment(deployment *v1beta1.Deployment) error { // uri gcrURI := containerRegistryURI(s.projectID, registry) - - // existing sub - ok := s.subscribed(gcrURI) - if !ok { - // create sub in a separate goroutine since client.Subscribe is a blocking call - go func() { - ctx, cancel := context.WithCancel(s.ctx) - defer cancel() - s.addSubscription(ctx, gcrURI) - defer s.removeSubscription(gcrURI) - subName := containerRegistrySubName(s.projectID, gcrURI) - err := s.client.Subscribe(s.ctx, gcrURI, subName) - if err != nil { - log.WithFields(log.Fields{ - "error": err, - "gcr_uri": gcrURI, - "subscription_name": subName, - }).Error("trigger.pubsub.manager: failed to create subscription") - } - }() - } else { - log.WithFields(log.Fields{ - "registry": registry, - "gcr_uri": gcrURI, - "deployment": deployment.Name, - "image_name": c.Image, - }).Debug("trigger.pubsub.manager: existing subscription for deployment's image found") - } + s.ensureSubscription(gcrURI) } diff --git a/trigger/pubsub/manager_test.go b/trigger/pubsub/manager_test.go new file mode 100644 index 000000000..b79ab7743 --- /dev/null +++ b/trigger/pubsub/manager_test.go @@ -0,0 +1,80 @@ +package pubsub + +import ( + "golang.org/x/net/context" + "sync" + "time" + + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/pkg/apis/extensions/v1beta1" + + "github.com/rusenask/keel/types" + + "testing" +) + +type fakeSubscriber struct { + TimesSubscribed int + SubscribedTopicName string + SubscribedSubName string +} + +func (s *fakeSubscriber) Subscribe(ctx context.Context, topic, subscription string) error { + s.TimesSubscribed++ + s.SubscribedTopicName = topic + s.SubscribedSubName = subscription + for { + select { + case <-ctx.Done(): + return nil + } + } +} + +func TestCheckDeployment(t *testing.T) { + fs := &fakeSubscriber{} + mng := &DefaultManager{ + client: fs, + mu: &sync.Mutex{}, + ctx: context.Background(), + subscribers: make(map[string]context.Context), + } + + dep := &v1beta1.Deployment{ + meta_v1.TypeMeta{}, + meta_v1.ObjectMeta{ + Name: "dep-1", + Namespace: "xxxx", + Labels: map[string]string{types.KeelPolicyLabel: "all"}, + }, + v1beta1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + v1.Container{ + Image: "gcr.io/v2-namespace/hello-world:1.1.1", + }, + v1.Container{ + Image: "gcr.io/v2-namespace/greetings-world:1.1.1", + }, + }, + }, + }, + }, + v1beta1.DeploymentStatus{}, + } + + err := mng.checkDeployment(dep) + if err != nil { + t.Errorf("deployment check failed: %s", err) + } + + // sleeping a bit since our fake subscriber goes into a separate goroutine + time.Sleep(100 * time.Millisecond) + + if fs.TimesSubscribed != 1 { + t.Errorf("expected to find one subscription, found: %d", fs.TimesSubscribed) + } + +} diff --git a/trigger/pubsub/pubsub.go b/trigger/pubsub/pubsub.go index 419938a34..06f872965 100644 --- a/trigger/pubsub/pubsub.go +++ b/trigger/pubsub/pubsub.go @@ -15,7 +15,8 @@ import ( log "github.com/Sirupsen/logrus" ) -type Subscriber struct { +// PubsubSubscriber is Google Cloud pubsub based subscriber +type PubsubSubscriber struct { providers map[string]provider.Provider project string @@ -30,18 +31,20 @@ type pubsubImplementer interface { Receive(ctx context.Context, f func(context.Context, *Message)) error } +// Opts - subscriber options type Opts struct { ProjectID string Providers map[string]provider.Provider } -func NewSubscriber(opts *Opts) (*Subscriber, error) { +// NewPubsubSubscriber - create new pubsub subscriber +func NewPubsubSubscriber(opts *Opts) (*PubsubSubscriber, error) { client, err := pubsub.NewClient(context.Background(), opts.ProjectID) if err != nil { return nil, err } - return &Subscriber{ + return &PubsubSubscriber{ project: opts.ProjectID, providers: opts.Providers, client: client, @@ -54,7 +57,7 @@ type Message struct { Tag string `json:"tag,omitempty"` } -func (s *Subscriber) ensureTopic(ctx context.Context, id string) error { +func (s *PubsubSubscriber) ensureTopic(ctx context.Context, id string) error { topic := s.client.Topic(id) exists, err := topic.Exists(ctx) if err != nil { @@ -72,7 +75,7 @@ func (s *Subscriber) ensureTopic(ctx context.Context, id string) error { return err } -func (s *Subscriber) ensureSubscription(ctx context.Context, subscriptionID, topicID string) error { +func (s *PubsubSubscriber) ensureSubscription(ctx context.Context, subscriptionID, topicID string) error { sub := s.client.Subscription(subscriptionID) exists, err := sub.Exists(ctx) if err != nil { @@ -93,8 +96,8 @@ func (s *Subscriber) ensureSubscription(ctx context.Context, subscriptionID, top return fmt.Errorf("failed to create subscription %s, error: %s", subscriptionID, err) } -// Subscribe - initiate subscriber -func (s *Subscriber) Subscribe(ctx context.Context, topic, subscription string) error { +// Subscribe - initiate PubsubSubscriber +func (s *PubsubSubscriber) Subscribe(ctx context.Context, topic, subscription string) error { // ensuring that topic exists err := s.ensureTopic(ctx, topic) if err != nil { @@ -110,7 +113,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic, subscription string) log.WithFields(log.Fields{ "topic": topic, "subscription": subscription, - }).Info("trigger.pubsub: subscribing for events...") + }).Info("trigger.pubsub: subscribing for events...") err = sub.Receive(ctx, s.callback) if err != nil { log.WithFields(log.Fields{ @@ -120,7 +123,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic, subscription string) return err } -func (s *Subscriber) callback(ctx context.Context, msg *pubsub.Message) { +func (s *PubsubSubscriber) callback(ctx context.Context, msg *pubsub.Message) { // disable ack, useful for testing if !s.disableAck { defer msg.Ack() diff --git a/trigger/pubsub/pubsub_test.go b/trigger/pubsub/pubsub_test.go index 12145f3e1..f62fb0f77 100644 --- a/trigger/pubsub/pubsub_test.go +++ b/trigger/pubsub/pubsub_test.go @@ -35,7 +35,7 @@ func fakeDoneFunc(id string, done bool) { func TestCallback(t *testing.T) { fp := &fakeProvider{} - sub := &Subscriber{disableAck: true, providers: map[string]provider.Provider{ + sub := &PubsubSubscriber{disableAck: true, providers: map[string]provider.Provider{ fp.GetName(): fp, }}