Skip to content

Commit

Permalink
Ensure done jobs get deleted periodically (#289)
Browse files Browse the repository at this point in the history
* Don't set lastUpdate when streaming events

* Fix tests

* Delete completed jobs after expiry time regardless of whether they have listeners or not

* delete done jobs with listeners after resuption time

* Ensure created jobs done is set to false

* Improve comments and Changelog

* Clean up tests
  • Loading branch information
Hamishpk authored Mar 8, 2024
1 parent fa2b0e1 commit f4c5b3d
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 35 deletions.
7 changes: 7 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
Version 11.9.1
--------------
* Delete done jobs after resuption time has passed
* Delete any job after 2x expiry
* Don't set lastUpdate on streaming events
* Add warning logs when deleting jobs with listeners

Version 11.9.0
--------------
* Revert job deletion change to match previous behaviour
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11.9.0
11.9.1
27 changes: 17 additions & 10 deletions mettle/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,6 @@ func (s *server) WaitExecution(req *pb.WaitExecutionRequest, stream pb.Execution
func (s *server) streamEvents(digest *pb.Digest, ch <-chan *longrunning.Operation, stream pb.Execution_ExecuteServer) error {
for op := range ch {
op.Name = digest.Hash
s.mutex.Lock()
j := s.jobs[digest.Hash]
j.LastUpdate = time.Now()
s.mutex.Unlock()
if err := stream.Send(op); err != nil {
log.Warning("Failed to forward event for %s: %s", digest.Hash, err)
s.stopStream(digest, ch)
Expand Down Expand Up @@ -436,6 +432,7 @@ func (s *server) eventStream(digest *pb.Digest, create bool) (<-chan *longrunnin
j.Current = nil
j.StartTime = time.Now()
j.LastUpdate = time.Now()
j.Done = false
} else if j.Current != nil {
// This request is resuming an existing stream, give them an update on the latest thing to happen.
// This helps avoid 504s from taking too long to send response headers since it can be an arbitrary
Expand Down Expand Up @@ -584,7 +581,7 @@ func (s *server) periodicallyDeleteJobs() {
s.mutex.Lock()
startTime := time.Now()
for digest, job := range s.jobs {
if shouldDeleteJob(job) {
if shouldDeleteJob(job, digest) {
delete(s.jobs, digest)
}
}
Expand All @@ -593,15 +590,25 @@ func (s *server) periodicallyDeleteJobs() {
}
}

func shouldDeleteJob(j *job) bool {
func shouldDeleteJob(j *job, digest string) bool {
timeSinceLastUpdate := time.Since(j.LastUpdate)
if j.Done && len(j.Streams) == 0 && timeSinceLastUpdate > retentionTime {
return true
if len(j.Streams) == 0 {
if j.Done && timeSinceLastUpdate > retentionTime {
// Job is complete with no listeners, safe to delete
return true
} else if !j.Done && timeSinceLastUpdate > expiryTime {
// Job is incomplete but with no listeners, safe to delete
return true
}
}
if !j.Done && len(j.Streams) == 0 && timeSinceLastUpdate > expiryTime {
if j.Done && timeSinceLastUpdate > resumptionTime {
// Job is done and old enough that is it considered stale, safe to delete
log.Warningf("Will delete job %s: Done: %t, listeners: %d, lastUpdate: %v", digest, j.Done, len(j.Streams), j.LastUpdate)
return true
}
if !j.Done && timeSinceLastUpdate > 2*expiryTime {
if timeSinceLastUpdate > 2*expiryTime {
// Job hasn't had an update in forever, safe to delete
log.Warningf("Will delete job %s: Done: %t, listeners: %d, lastUpdate: %v ", digest, j.Done, len(j.Streams), j.LastUpdate)
return true
}
return false
Expand Down
39 changes: 15 additions & 24 deletions mettle/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,23 +206,23 @@ func TestShouldDeleteJob(t *testing.T) {
shouldDelete bool
}{
{
name: "incomplete job returns false",
name: "incomplete job doesn't expire",
job: &job{
Done: false,
LastUpdate: now.Add(-1 * time.Minute),
},
shouldDelete: false,
},
{
name: "completed job within retention time returns false",
name: "completed job within retention time does not expire",
job: &job{
Done: true,
LastUpdate: now.Add(-1 * time.Minute),
},
shouldDelete: false,
},
{
name: "completed job after retention time and no listeners returns true",
name: "completed job after retention time and no listeners does expire",
job: &job{
Done: true,
LastUpdate: now.Add(-6 * time.Minute),
Expand All @@ -231,30 +231,30 @@ func TestShouldDeleteJob(t *testing.T) {
shouldDelete: true,
},
{
name: "incomplete job with no listeners within expiry time returns false",
name: "complete job with active listeners after 1x resumption time does expire",
job: &job{
Done: true,
LastUpdate: now.Add(-11 * time.Minute),
Streams: []chan *longrunning.Operation{make(chan *longrunning.Operation), make(chan *longrunning.Operation)},
},
shouldDelete: true,
},
{
name: "incomplete job with no listeners within expiry time does not expire",
job: &job{
Done: false,
LastUpdate: now.Add(-59 * time.Minute),
},
shouldDelete: false,
},
{
name: "incomplete job with no listeners after expiry time returns true",
name: "incomplete job with no listeners after 1x expiry time does expire",
job: &job{
Done: false,
LastUpdate: now.Add(-61 * time.Minute),
},
shouldDelete: true,
},
{
name: "complete job with active listeners after 1x expiry time does not expire",
job: &job{
Done: true,
LastUpdate: now.Add(-121 * time.Minute),
Streams: []chan *longrunning.Operation{make(chan *longrunning.Operation), make(chan *longrunning.Operation)},
},
shouldDelete: false,
},
{
name: "incomplete job with listeners after 2x expiry time does expire",
job: &job{
Expand All @@ -264,20 +264,11 @@ func TestShouldDeleteJob(t *testing.T) {
},
shouldDelete: true,
},
{
name: "complete job with active listeners after 1x expiry time does not expire",
job: &job{
Done: true,
LastUpdate: now.Add(-61 * time.Minute),
Streams: []chan *longrunning.Operation{make(chan *longrunning.Operation), make(chan *longrunning.Operation)},
},
shouldDelete: false,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert.Equal(t, test.shouldDelete, shouldDeleteJob(test.job))
assert.Equal(t, test.shouldDelete, shouldDeleteJob(test.job, "1234"))
})
}
}
Expand Down

0 comments on commit f4c5b3d

Please sign in to comment.