Skip to content

Commit

Permalink
Cleanup log, pre/post install hooks with extended watch
Browse files Browse the repository at this point in the history
(cherry picked from commit 0e0141b)
  • Loading branch information
distorhead authored and Qiu Yu committed Apr 27, 2018
1 parent 3c234a8 commit bfa37bc
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 67 deletions.
3 changes: 2 additions & 1 deletion pkg/helm/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ func (h *Client) install(ctx context.Context, req *rls.InstallReleaseRequest) (*
}
}
formatJobHeader := func(jobName string, podName string, containerName string) string {
// tail -f on multiple files prints similar headers
return fmt.Sprintf("==> Job \"%s\", Pod \"%s\", Container \"%s\" <==", jobName, podName, containerName)
}

Expand All @@ -414,7 +415,7 @@ func (h *Client) install(ctx context.Context, req *rls.InstallReleaseRequest) (*

setLogHeader(formatJobHeader(jobPodError.JobName, jobPodError.PodName, jobPodError.ContainerName))

fmt.Fprintf(os.Stderr, "ERROR: %s", jobPodError.Message)
fmt.Fprintf(os.Stderr, "Error: %s\n", jobPodError.Message)
} else {
finalResp = resp // TODO verify/debug this code
}
Expand Down
43 changes: 20 additions & 23 deletions pkg/kube/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,20 @@ func (pod *PodWatchMonitor) Watch() error {
}

_, err = watch.Until(pod.Timeout, watcher, func(e watch.Event) (bool, error) {
pod.Kube.Log("[DEBUG] Pod %s event: %+v", pod.ResourceName, e)

object, ok := e.Object.(*core.Pod)
if !ok {
return true, fmt.Errorf("Expected %s to be a *core.Pod, got %T", pod.ResourceName, e.Object)
}

// TODO: enable InitContainerStatuses
allContainerStatuses := make([]core.ContainerStatus, 0)
for _, cs := range object.Status.InitContainerStatuses {
allContainerStatuses = append(allContainerStatuses, cs)
}
for _, cs := range object.Status.ContainerStatuses {
allContainerStatuses = append(allContainerStatuses, cs)
}

for _, cs := range allContainerStatuses {
oldState := pod.ContainerMonitorStates[cs.Name]

if cs.State.Waiting != nil {
Expand Down Expand Up @@ -252,6 +257,8 @@ type JobWatchMonitor struct {
PodError chan PodError

MonitoredPods []*PodWatchMonitor

FinalJobStatus batch.JobStatus
}

func (job *JobWatchMonitor) Watch() error {
Expand All @@ -271,18 +278,15 @@ func (job *JobWatchMonitor) Watch() error {
}

_, err = watch.Until(job.Timeout, watcher, func(e watch.Event) (bool, error) {
job.Kube.Log("[DEBUG] Job %s event: %+v", job.ResourceName, e)

switch job.State {
case "":
if e.Type == watch.Added {
job.Started <- true

oldState := job.State
job.State = "Started"
job.Kube.Log("[DEBUG] Job %s watcher state changed %v -> %v", job.ResourceName, oldState, job.State)

job.Kube.Log("[DEBUG] Starting job %s pods watcher", job.ResourceName)
job.Kube.Log("Starting to watch job %s pods", job.ResourceName)

go func() {
err := job.WatchPods()
if err != nil {
Expand All @@ -299,19 +303,15 @@ func (job *JobWatchMonitor) Watch() error {

for _, c := range object.Status.Conditions {
if c.Type == batch.JobComplete && c.Status == core.ConditionTrue {
oldState := job.State
job.State = "Succeeded"
job.Kube.Log("[DEBUG] Job %s watcher state changed %v -> %v", job.ResourceName, oldState, job.State)

job.Kube.Log("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", job.ResourceName, object.Status.Active, object.Status.Failed, object.Status.Succeeded)
job.FinalJobStatus = object.Status

job.Succeeded <- true

return true, nil
} else if c.Type == batch.JobFailed && c.Status == core.ConditionTrue {
oldState := job.State
job.State = "Failed"
job.Kube.Log("[DEBUG] Job %s watcher state changed %v -> %v", job.ResourceName, oldState, job.State)

return true, fmt.Errorf("Job failed: %s", c.Reason)
}
Expand Down Expand Up @@ -359,10 +359,8 @@ func (job *JobWatchMonitor) WatchPods() error {
return err
}

// TODO calculate timeout since job-watch started
// TODO: calculate timeout since job-watch started
_, err = watch.Until(job.Timeout, podListWatcher, func(e watch.Event) (bool, error) {
job.Kube.Log("[DEBUG] Job %s pods list event: %+v", job.ResourceName, e)

podObject, ok := e.Object.(*core.Pod)
if !ok {
return true, fmt.Errorf("Expected %s to be a *core.Pod, got %T", job.ResourceName, e.Object)
Expand All @@ -375,15 +373,15 @@ func (job *JobWatchMonitor) WatchPods() error {
}
}

// TODO constructor from job & podObject
// TODO: constructor from job & podObject
pod := &PodWatchMonitor{
WatchMonitor: WatchMonitor{
Kube: job.Kube,
Timeout: job.Timeout,

Namespace: job.Namespace,
ResourceName: podObject.Name,
InitialResourceVersion: "",
InitialResourceVersion: "", // this will make PodWatchMonitor receive podObject again and handle its state properly by itself
},

PodLogChunk: job.PodLogChunk,
Expand Down Expand Up @@ -418,18 +416,18 @@ func (job *JobWatchMonitor) WatchPods() error {
return nil
}

func (c *Client) WatchJobsTillDone(namespace string, reader io.Reader, watchFeed WatchFeed, timeout time.Duration) error {
func (c *Client) WatchJobsUntilReady(namespace string, reader io.Reader, watchFeed WatchFeed, timeout time.Duration) error {
infos, err := c.Build(namespace, reader)
if err != nil {
return err
}

return perform(infos, func(info *resource.Info) error {
return c.watchJobTillDone(info, watchFeed, timeout)
return c.watchJobUntilReady(info, watchFeed, timeout)
})
}

func (c *Client) watchJobTillDone(jobInfo *resource.Info, watchFeed WatchFeed, timeout time.Duration) error {
func (c *Client) watchJobUntilReady(jobInfo *resource.Info, watchFeed WatchFeed, timeout time.Duration) error {
if jobInfo.Mapping.GroupVersionKind.Kind != "Job" {
return nil
}
Expand All @@ -454,7 +452,6 @@ func (c *Client) watchJobTillDone(jobInfo *resource.Info, watchFeed WatchFeed, t
Error: make(chan error, 0),
}

c.Log("[DEBUG] Starting job %s watcher", job.ResourceName)
go func() {
err := job.Watch()
if err != nil {
Expand All @@ -467,7 +464,7 @@ func (c *Client) watchJobTillDone(jobInfo *resource.Info, watchFeed WatchFeed, t
case <-job.Started:
c.Log("Job %s started", job.ResourceName)
case <-job.Succeeded:
c.Log("Job %s succeeded", job.ResourceName)
c.Log("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", job.ResourceName, job.FinalJobStatus.Active, job.FinalJobStatus.Failed, job.FinalJobStatus.Succeeded)
return nil
case err := <-job.Error:
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/tiller/environment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ type KubeClient interface {
// and returns said phase (PodSucceeded or PodFailed qualify).
WaitAndGetCompletedPodPhase(namespace string, reader io.Reader, timeout time.Duration) (core.PodPhase, error)

WatchJobsTillDone(namespace string, reader io.Reader, watchFeed kube.WatchFeed, timeout time.Duration) error
WatchJobsUntilReady(namespace string, reader io.Reader, watchFeed kube.WatchFeed, timeout time.Duration) error
}

// PrintingKubeClient implements KubeClient, but simply prints the reader to
Expand Down
80 changes: 39 additions & 41 deletions pkg/tiller/release_install.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,44 @@ func (s *ReleaseServer) prepareRelease(req *services.InstallReleaseRequest) (*re
func (s *ReleaseServer) performRelease(r *release.Release, req *services.InstallReleaseRequest, stream services.ReleaseService_InstallReleaseServer) (*services.InstallReleaseResponse, error) {
res := &services.InstallReleaseResponse{Release: r}

watchFeed := &kube.WatchFeedProto{
WriteJobLogChunkFunc: func(chunk kube.JobLogChunk) error {
chunkResp := &services.InstallReleaseResponse{
WatchFeed: &release.WatchFeed{
JobLogChunk: &release.JobLogChunk{
JobName: chunk.JobName,
PodName: chunk.PodName,
ContainerName: chunk.ContainerName,
LogLines: make([]*release.LogLine, 0),
},
},
}

for _, line := range chunk.LogLines {
ll := &release.LogLine{
Timestamp: line.Timestamp,
Data: line.Data,
}
chunkResp.WatchFeed.JobLogChunk.LogLines = append(chunkResp.WatchFeed.JobLogChunk.LogLines, ll)
}

return stream.Send(chunkResp)
},
WriteJobPodErrorFunc: func(obj kube.JobPodError) error {
chunkResp := &services.InstallReleaseResponse{
WatchFeed: &release.WatchFeed{
JobPodError: &release.JobPodError{
JobName: obj.JobName,
PodName: obj.PodName,
ContainerName: obj.ContainerName,
Message: obj.Message,
},
},
}
return stream.Send(chunkResp)
},
}

if req.DryRun {
s.Log("dry run for %s", r.Name)
res.Release.Info.Description = "Dry run complete"
Expand All @@ -152,46 +190,6 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install

// pre-install hooks
if !req.DisableHooks {
watchFeed := &kube.WatchFeedProto{
WriteJobLogChunkFunc: func(chunk kube.JobLogChunk) error {
chunkResp := &services.InstallReleaseResponse{
WatchFeed: &release.WatchFeed{
JobLogChunk: &release.JobLogChunk{
JobName: chunk.JobName,
PodName: chunk.PodName,
ContainerName: chunk.ContainerName,
LogLines: make([]*release.LogLine, 0),
},
},
}

for _, line := range chunk.LogLines {
ll := &release.LogLine{
Timestamp: line.Timestamp,
Data: line.Data,
}
chunkResp.WatchFeed.JobLogChunk.LogLines = append(chunkResp.WatchFeed.JobLogChunk.LogLines, ll)
}

return stream.Send(chunkResp)
},
WriteJobPodErrorFunc: func(obj kube.JobPodError) error {
chunkResp := &services.InstallReleaseResponse{
WatchFeed: &release.WatchFeed{
JobPodError: &release.JobPodError{
JobName: obj.JobName,
PodName: obj.PodName,
ContainerName: obj.ContainerName,
Message: obj.Message,
},
},
}
return stream.Send(chunkResp)
},
}

// TODO watch job with feed only if job have annotation "helm/watch-logs": "true"
// TODO otherwise watch as ordinary hook just like before, using WatchUntilReady
if err := s.execHookWithWatchFeed(r.Hooks, r.Name, r.Namespace, hooks.PreInstall, req.Timeout, watchFeed); err != nil {
return res, err
}
Expand Down Expand Up @@ -249,7 +247,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install

// post-install hooks
if !req.DisableHooks {
if err := s.execHook(r.Hooks, r.Name, r.Namespace, hooks.PostInstall, req.Timeout); err != nil {
if err := s.execHookWithWatchFeed(r.Hooks, r.Name, r.Namespace, hooks.PostInstall, req.Timeout, watchFeed); err != nil {
msg := fmt.Sprintf("Release %q failed post-install: %s", r.Name, err)
s.Log("warning: %s", msg)
r.Info.Status.Code = release.Status_FAILED
Expand Down
12 changes: 11 additions & 1 deletion pkg/tiller/release_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,17 @@ func (s *ReleaseServer) execHookWithWatchFeed(hs []*release.Hook, name, namespac
b.Reset()
b.WriteString(h.Manifest)

if err := kubeCli.WatchJobsTillDone(namespace, b, watchFeed, time.Duration(timeout)*time.Second); err != nil {
var err error
if true {
// TODO: Watch with watchFeed only if helm/watch=true annotation is set,
// TODO: because this code is new and experimental, so WatchUntilReady
// TODO: will be used by default.
err = kubeCli.WatchJobsUntilReady(namespace, b, watchFeed, time.Duration(timeout)*time.Second)
} else {
err = kubeCli.WatchUntilReady(namespace, b, timeout, false)
}

if err != nil {
s.Log("warning: Release %s %s %s could not complete: %s", name, hook, h.Path, err)
// If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted
// under failed condition. If so, then clear the corresponding resource object in the hook
Expand Down

0 comments on commit bfa37bc

Please sign in to comment.