From f4c5b3d4f4f1e52bd75bce506696bd4c37c8d7cb Mon Sep 17 00:00:00 2001 From: Hamish Pitkeathly Date: Fri, 8 Mar 2024 09:52:47 +0000 Subject: [PATCH] Ensure done jobs get deleted periodically (#289) * Don't set lastUpdate when streaming events * Fix tests * Delete completed jobs after expiry time regardless of whether they have listeners or not * delete done jobs with listeners after resuption time * Ensure created jobs done is set to false * Improve comments and Changelog * Clean up tests --- ChangeLog | 7 +++++++ VERSION | 2 +- mettle/api/api.go | 27 +++++++++++++++++---------- mettle/api/api_test.go | 39 +++++++++++++++------------------------ 4 files changed, 40 insertions(+), 35 deletions(-) diff --git a/ChangeLog b/ChangeLog index 46a14de6..a96a5000 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,10 @@ +Version 11.9.1 +-------------- + * Delete done jobs after resuption time has passed + * Delete any job after 2x expiry + * Don't set lastUpdate on streaming events + * Add warning logs when deleting jobs with listeners + Version 11.9.0 -------------- * Revert job deletion change to match previous behaviour diff --git a/VERSION b/VERSION index ba9aff72..03a7c8d3 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -11.9.0 +11.9.1 diff --git a/mettle/api/api.go b/mettle/api/api.go index d056e27a..7cb584b1 100644 --- a/mettle/api/api.go +++ b/mettle/api/api.go @@ -385,10 +385,6 @@ func (s *server) WaitExecution(req *pb.WaitExecutionRequest, stream pb.Execution func (s *server) streamEvents(digest *pb.Digest, ch <-chan *longrunning.Operation, stream pb.Execution_ExecuteServer) error { for op := range ch { op.Name = digest.Hash - s.mutex.Lock() - j := s.jobs[digest.Hash] - j.LastUpdate = time.Now() - s.mutex.Unlock() if err := stream.Send(op); err != nil { log.Warning("Failed to forward event for %s: %s", digest.Hash, err) s.stopStream(digest, ch) @@ -436,6 +432,7 @@ func (s *server) eventStream(digest *pb.Digest, create bool) (<-chan *longrunnin j.Current = nil j.StartTime = time.Now() j.LastUpdate = time.Now() + j.Done = false } else if j.Current != nil { // This request is resuming an existing stream, give them an update on the latest thing to happen. // This helps avoid 504s from taking too long to send response headers since it can be an arbitrary @@ -584,7 +581,7 @@ func (s *server) periodicallyDeleteJobs() { s.mutex.Lock() startTime := time.Now() for digest, job := range s.jobs { - if shouldDeleteJob(job) { + if shouldDeleteJob(job, digest) { delete(s.jobs, digest) } } @@ -593,15 +590,25 @@ func (s *server) periodicallyDeleteJobs() { } } -func shouldDeleteJob(j *job) bool { +func shouldDeleteJob(j *job, digest string) bool { timeSinceLastUpdate := time.Since(j.LastUpdate) - if j.Done && len(j.Streams) == 0 && timeSinceLastUpdate > retentionTime { - return true + if len(j.Streams) == 0 { + if j.Done && timeSinceLastUpdate > retentionTime { + // Job is complete with no listeners, safe to delete + return true + } else if !j.Done && timeSinceLastUpdate > expiryTime { + // Job is incomplete but with no listeners, safe to delete + return true + } } - if !j.Done && len(j.Streams) == 0 && timeSinceLastUpdate > expiryTime { + if j.Done && timeSinceLastUpdate > resumptionTime { + // Job is done and old enough that is it considered stale, safe to delete + log.Warningf("Will delete job %s: Done: %t, listeners: %d, lastUpdate: %v", digest, j.Done, len(j.Streams), j.LastUpdate) return true } - if !j.Done && timeSinceLastUpdate > 2*expiryTime { + if timeSinceLastUpdate > 2*expiryTime { + // Job hasn't had an update in forever, safe to delete + log.Warningf("Will delete job %s: Done: %t, listeners: %d, lastUpdate: %v ", digest, j.Done, len(j.Streams), j.LastUpdate) return true } return false diff --git a/mettle/api/api_test.go b/mettle/api/api_test.go index a6a4f363..064b1e6d 100644 --- a/mettle/api/api_test.go +++ b/mettle/api/api_test.go @@ -206,7 +206,7 @@ func TestShouldDeleteJob(t *testing.T) { shouldDelete bool }{ { - name: "incomplete job returns false", + name: "incomplete job doesn't expire", job: &job{ Done: false, LastUpdate: now.Add(-1 * time.Minute), @@ -214,7 +214,7 @@ func TestShouldDeleteJob(t *testing.T) { shouldDelete: false, }, { - name: "completed job within retention time returns false", + name: "completed job within retention time does not expire", job: &job{ Done: true, LastUpdate: now.Add(-1 * time.Minute), @@ -222,7 +222,7 @@ func TestShouldDeleteJob(t *testing.T) { shouldDelete: false, }, { - name: "completed job after retention time and no listeners returns true", + name: "completed job after retention time and no listeners does expire", job: &job{ Done: true, LastUpdate: now.Add(-6 * time.Minute), @@ -231,7 +231,16 @@ func TestShouldDeleteJob(t *testing.T) { shouldDelete: true, }, { - name: "incomplete job with no listeners within expiry time returns false", + name: "complete job with active listeners after 1x resumption time does expire", + job: &job{ + Done: true, + LastUpdate: now.Add(-11 * time.Minute), + Streams: []chan *longrunning.Operation{make(chan *longrunning.Operation), make(chan *longrunning.Operation)}, + }, + shouldDelete: true, + }, + { + name: "incomplete job with no listeners within expiry time does not expire", job: &job{ Done: false, LastUpdate: now.Add(-59 * time.Minute), @@ -239,22 +248,13 @@ func TestShouldDeleteJob(t *testing.T) { shouldDelete: false, }, { - name: "incomplete job with no listeners after expiry time returns true", + name: "incomplete job with no listeners after 1x expiry time does expire", job: &job{ Done: false, LastUpdate: now.Add(-61 * time.Minute), }, shouldDelete: true, }, - { - name: "complete job with active listeners after 1x expiry time does not expire", - job: &job{ - Done: true, - LastUpdate: now.Add(-121 * time.Minute), - Streams: []chan *longrunning.Operation{make(chan *longrunning.Operation), make(chan *longrunning.Operation)}, - }, - shouldDelete: false, - }, { name: "incomplete job with listeners after 2x expiry time does expire", job: &job{ @@ -264,20 +264,11 @@ func TestShouldDeleteJob(t *testing.T) { }, shouldDelete: true, }, - { - name: "complete job with active listeners after 1x expiry time does not expire", - job: &job{ - Done: true, - LastUpdate: now.Add(-61 * time.Minute), - Streams: []chan *longrunning.Operation{make(chan *longrunning.Operation), make(chan *longrunning.Operation)}, - }, - shouldDelete: false, - }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - assert.Equal(t, test.shouldDelete, shouldDeleteJob(test.job)) + assert.Equal(t, test.shouldDelete, shouldDeleteJob(test.job, "1234")) }) } }