From bfa37bc5ae9119190eaf8abe6ead5027cecec63e Mon Sep 17 00:00:00 2001 From: Timofey Kirillov Date: Wed, 21 Feb 2018 21:29:32 +0300 Subject: [PATCH] Cleanup log, pre/post install hooks with extended watch (cherry picked from commit 0e0141b40e5ce0428013873d099757ffbe403e3e) --- pkg/helm/client.go | 3 +- pkg/kube/job.go | 43 +++++++------- pkg/tiller/environment/environment.go | 2 +- pkg/tiller/release_install.go | 80 +++++++++++++-------------- pkg/tiller/release_server.go | 12 +++- 5 files changed, 73 insertions(+), 67 deletions(-) diff --git a/pkg/helm/client.go b/pkg/helm/client.go index dd4637f814a..7923a6bb370 100644 --- a/pkg/helm/client.go +++ b/pkg/helm/client.go @@ -388,6 +388,7 @@ func (h *Client) install(ctx context.Context, req *rls.InstallReleaseRequest) (* } } formatJobHeader := func(jobName string, podName string, containerName string) string { + // tail -f on multiple files prints similar headers return fmt.Sprintf("==> Job \"%s\", Pod \"%s\", Container \"%s\" <==", jobName, podName, containerName) } @@ -414,7 +415,7 @@ func (h *Client) install(ctx context.Context, req *rls.InstallReleaseRequest) (* setLogHeader(formatJobHeader(jobPodError.JobName, jobPodError.PodName, jobPodError.ContainerName)) - fmt.Fprintf(os.Stderr, "ERROR: %s", jobPodError.Message) + fmt.Fprintf(os.Stderr, "Error: %s\n", jobPodError.Message) } else { finalResp = resp // TODO verify/debug this code } diff --git a/pkg/kube/job.go b/pkg/kube/job.go index 3882a135149..ac0aea2dcd1 100644 --- a/pkg/kube/job.go +++ b/pkg/kube/job.go @@ -198,15 +198,20 @@ func (pod *PodWatchMonitor) Watch() error { } _, err = watch.Until(pod.Timeout, watcher, func(e watch.Event) (bool, error) { - pod.Kube.Log("[DEBUG] Pod %s event: %+v", pod.ResourceName, e) - object, ok := e.Object.(*core.Pod) if !ok { return true, fmt.Errorf("Expected %s to be a *core.Pod, got %T", pod.ResourceName, e.Object) } - // TODO: enable InitContainerStatuses + allContainerStatuses := make([]core.ContainerStatus, 0) + for _, cs := range object.Status.InitContainerStatuses { + allContainerStatuses = append(allContainerStatuses, cs) + } for _, cs := range object.Status.ContainerStatuses { + allContainerStatuses = append(allContainerStatuses, cs) + } + + for _, cs := range allContainerStatuses { oldState := pod.ContainerMonitorStates[cs.Name] if cs.State.Waiting != nil { @@ -252,6 +257,8 @@ type JobWatchMonitor struct { PodError chan PodError MonitoredPods []*PodWatchMonitor + + FinalJobStatus batch.JobStatus } func (job *JobWatchMonitor) Watch() error { @@ -271,18 +278,15 @@ func (job *JobWatchMonitor) Watch() error { } _, err = watch.Until(job.Timeout, watcher, func(e watch.Event) (bool, error) { - job.Kube.Log("[DEBUG] Job %s event: %+v", job.ResourceName, e) - switch job.State { case "": if e.Type == watch.Added { job.Started <- true - oldState := job.State job.State = "Started" - job.Kube.Log("[DEBUG] Job %s watcher state changed %v -> %v", job.ResourceName, oldState, job.State) - job.Kube.Log("[DEBUG] Starting job %s pods watcher", job.ResourceName) + job.Kube.Log("Starting to watch job %s pods", job.ResourceName) + go func() { err := job.WatchPods() if err != nil { @@ -299,19 +303,15 @@ func (job *JobWatchMonitor) Watch() error { for _, c := range object.Status.Conditions { if c.Type == batch.JobComplete && c.Status == core.ConditionTrue { - oldState := job.State job.State = "Succeeded" - job.Kube.Log("[DEBUG] Job %s watcher state changed %v -> %v", job.ResourceName, oldState, job.State) - job.Kube.Log("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", job.ResourceName, object.Status.Active, object.Status.Failed, object.Status.Succeeded) + job.FinalJobStatus = object.Status job.Succeeded <- true return true, nil } else if c.Type == batch.JobFailed && c.Status == core.ConditionTrue { - oldState := job.State job.State = "Failed" - job.Kube.Log("[DEBUG] Job %s watcher state changed %v -> %v", job.ResourceName, oldState, job.State) return true, fmt.Errorf("Job failed: %s", c.Reason) } @@ -359,10 +359,8 @@ func (job *JobWatchMonitor) WatchPods() error { return err } - // TODO calculate timeout since job-watch started + // TODO: calculate timeout since job-watch started _, err = watch.Until(job.Timeout, podListWatcher, func(e watch.Event) (bool, error) { - job.Kube.Log("[DEBUG] Job %s pods list event: %+v", job.ResourceName, e) - podObject, ok := e.Object.(*core.Pod) if !ok { return true, fmt.Errorf("Expected %s to be a *core.Pod, got %T", job.ResourceName, e.Object) @@ -375,7 +373,7 @@ func (job *JobWatchMonitor) WatchPods() error { } } - // TODO constructor from job & podObject + // TODO: constructor from job & podObject pod := &PodWatchMonitor{ WatchMonitor: WatchMonitor{ Kube: job.Kube, @@ -383,7 +381,7 @@ func (job *JobWatchMonitor) WatchPods() error { Namespace: job.Namespace, ResourceName: podObject.Name, - InitialResourceVersion: "", + InitialResourceVersion: "", // this will make PodWatchMonitor receive podObject again and handle its state properly by itself }, PodLogChunk: job.PodLogChunk, @@ -418,18 +416,18 @@ func (job *JobWatchMonitor) WatchPods() error { return nil } -func (c *Client) WatchJobsTillDone(namespace string, reader io.Reader, watchFeed WatchFeed, timeout time.Duration) error { +func (c *Client) WatchJobsUntilReady(namespace string, reader io.Reader, watchFeed WatchFeed, timeout time.Duration) error { infos, err := c.Build(namespace, reader) if err != nil { return err } return perform(infos, func(info *resource.Info) error { - return c.watchJobTillDone(info, watchFeed, timeout) + return c.watchJobUntilReady(info, watchFeed, timeout) }) } -func (c *Client) watchJobTillDone(jobInfo *resource.Info, watchFeed WatchFeed, timeout time.Duration) error { +func (c *Client) watchJobUntilReady(jobInfo *resource.Info, watchFeed WatchFeed, timeout time.Duration) error { if jobInfo.Mapping.GroupVersionKind.Kind != "Job" { return nil } @@ -454,7 +452,6 @@ func (c *Client) watchJobTillDone(jobInfo *resource.Info, watchFeed WatchFeed, t Error: make(chan error, 0), } - c.Log("[DEBUG] Starting job %s watcher", job.ResourceName) go func() { err := job.Watch() if err != nil { @@ -467,7 +464,7 @@ func (c *Client) watchJobTillDone(jobInfo *resource.Info, watchFeed WatchFeed, t case <-job.Started: c.Log("Job %s started", job.ResourceName) case <-job.Succeeded: - c.Log("Job %s succeeded", job.ResourceName) + c.Log("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", job.ResourceName, job.FinalJobStatus.Active, job.FinalJobStatus.Failed, job.FinalJobStatus.Succeeded) return nil case err := <-job.Error: return err diff --git a/pkg/tiller/environment/environment.go b/pkg/tiller/environment/environment.go index 5af97a49f64..3839dcc29b9 100644 --- a/pkg/tiller/environment/environment.go +++ b/pkg/tiller/environment/environment.go @@ -144,7 +144,7 @@ type KubeClient interface { // and returns said phase (PodSucceeded or PodFailed qualify). WaitAndGetCompletedPodPhase(namespace string, reader io.Reader, timeout time.Duration) (core.PodPhase, error) - WatchJobsTillDone(namespace string, reader io.Reader, watchFeed kube.WatchFeed, timeout time.Duration) error + WatchJobsUntilReady(namespace string, reader io.Reader, watchFeed kube.WatchFeed, timeout time.Duration) error } // PrintingKubeClient implements KubeClient, but simply prints the reader to diff --git a/pkg/tiller/release_install.go b/pkg/tiller/release_install.go index da8cf2be87d..0ed43d8d6fb 100644 --- a/pkg/tiller/release_install.go +++ b/pkg/tiller/release_install.go @@ -144,6 +144,44 @@ func (s *ReleaseServer) prepareRelease(req *services.InstallReleaseRequest) (*re func (s *ReleaseServer) performRelease(r *release.Release, req *services.InstallReleaseRequest, stream services.ReleaseService_InstallReleaseServer) (*services.InstallReleaseResponse, error) { res := &services.InstallReleaseResponse{Release: r} + watchFeed := &kube.WatchFeedProto{ + WriteJobLogChunkFunc: func(chunk kube.JobLogChunk) error { + chunkResp := &services.InstallReleaseResponse{ + WatchFeed: &release.WatchFeed{ + JobLogChunk: &release.JobLogChunk{ + JobName: chunk.JobName, + PodName: chunk.PodName, + ContainerName: chunk.ContainerName, + LogLines: make([]*release.LogLine, 0), + }, + }, + } + + for _, line := range chunk.LogLines { + ll := &release.LogLine{ + Timestamp: line.Timestamp, + Data: line.Data, + } + chunkResp.WatchFeed.JobLogChunk.LogLines = append(chunkResp.WatchFeed.JobLogChunk.LogLines, ll) + } + + return stream.Send(chunkResp) + }, + WriteJobPodErrorFunc: func(obj kube.JobPodError) error { + chunkResp := &services.InstallReleaseResponse{ + WatchFeed: &release.WatchFeed{ + JobPodError: &release.JobPodError{ + JobName: obj.JobName, + PodName: obj.PodName, + ContainerName: obj.ContainerName, + Message: obj.Message, + }, + }, + } + return stream.Send(chunkResp) + }, + } + if req.DryRun { s.Log("dry run for %s", r.Name) res.Release.Info.Description = "Dry run complete" @@ -152,46 +190,6 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install // pre-install hooks if !req.DisableHooks { - watchFeed := &kube.WatchFeedProto{ - WriteJobLogChunkFunc: func(chunk kube.JobLogChunk) error { - chunkResp := &services.InstallReleaseResponse{ - WatchFeed: &release.WatchFeed{ - JobLogChunk: &release.JobLogChunk{ - JobName: chunk.JobName, - PodName: chunk.PodName, - ContainerName: chunk.ContainerName, - LogLines: make([]*release.LogLine, 0), - }, - }, - } - - for _, line := range chunk.LogLines { - ll := &release.LogLine{ - Timestamp: line.Timestamp, - Data: line.Data, - } - chunkResp.WatchFeed.JobLogChunk.LogLines = append(chunkResp.WatchFeed.JobLogChunk.LogLines, ll) - } - - return stream.Send(chunkResp) - }, - WriteJobPodErrorFunc: func(obj kube.JobPodError) error { - chunkResp := &services.InstallReleaseResponse{ - WatchFeed: &release.WatchFeed{ - JobPodError: &release.JobPodError{ - JobName: obj.JobName, - PodName: obj.PodName, - ContainerName: obj.ContainerName, - Message: obj.Message, - }, - }, - } - return stream.Send(chunkResp) - }, - } - - // TODO watch job with feed only if job have annotation "helm/watch-logs": "true" - // TODO otherwise watch as ordinary hook just like before, using WatchUntilReady if err := s.execHookWithWatchFeed(r.Hooks, r.Name, r.Namespace, hooks.PreInstall, req.Timeout, watchFeed); err != nil { return res, err } @@ -249,7 +247,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install // post-install hooks if !req.DisableHooks { - if err := s.execHook(r.Hooks, r.Name, r.Namespace, hooks.PostInstall, req.Timeout); err != nil { + if err := s.execHookWithWatchFeed(r.Hooks, r.Name, r.Namespace, hooks.PostInstall, req.Timeout, watchFeed); err != nil { msg := fmt.Sprintf("Release %q failed post-install: %s", r.Name, err) s.Log("warning: %s", msg) r.Info.Status.Code = release.Status_FAILED diff --git a/pkg/tiller/release_server.go b/pkg/tiller/release_server.go index 33ee958689c..e4a026f2b49 100644 --- a/pkg/tiller/release_server.go +++ b/pkg/tiller/release_server.go @@ -382,7 +382,17 @@ func (s *ReleaseServer) execHookWithWatchFeed(hs []*release.Hook, name, namespac b.Reset() b.WriteString(h.Manifest) - if err := kubeCli.WatchJobsTillDone(namespace, b, watchFeed, time.Duration(timeout)*time.Second); err != nil { + var err error + if true { + // TODO: Watch with watchFeed only if helm/watch=true annotation is set, + // TODO: because this code is new and experimental, so WatchUntilReady + // TODO: will be used by default. + err = kubeCli.WatchJobsUntilReady(namespace, b, watchFeed, time.Duration(timeout)*time.Second) + } else { + err = kubeCli.WatchUntilReady(namespace, b, timeout, false) + } + + if err != nil { s.Log("warning: Release %s %s %s could not complete: %s", name, hook, h.Path, err) // If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted // under failed condition. If so, then clear the corresponding resource object in the hook