Skip to content

Commit

Permalink
Update job deletion logic + propagate update times (#283)
Browse files Browse the repository at this point in the history
* Change update logic to match previous behaviour, update time in a few locations.

* Version + Changelog

---------

Co-authored-by: Hamish Pitkeathly <[email protected]>
  • Loading branch information
Garbett1 and Hamishpk authored Mar 6, 2024
1 parent 7764055 commit fa2b0e1
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 6 deletions.
6 changes: 5 additions & 1 deletion ChangeLog
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
Version 11.9.0
--------------
* Revert job deletion change to match previous behaviour

Version 11.8.5
--------------
* Fix bug for periodically deleting jobs
* Fix bug for periodically deleting jobs

Version 11.8.4
--------------
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11.8.5
11.9.0
9 changes: 7 additions & 2 deletions mettle/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string,
platform: allowedPlatform,
client: client,
numPollers: queueOpts.NumPollers,
deleteJobsTicker: time.NewTicker(10 * time.Minute),
deleteJobsTicker: time.NewTicker(30 * time.Second),
actuallyValidate: len(allowedPlatform) > 0,
}

Expand Down Expand Up @@ -385,6 +385,10 @@ func (s *server) WaitExecution(req *pb.WaitExecutionRequest, stream pb.Execution
func (s *server) streamEvents(digest *pb.Digest, ch <-chan *longrunning.Operation, stream pb.Execution_ExecuteServer) error {
for op := range ch {
op.Name = digest.Hash
s.mutex.Lock()
j := s.jobs[digest.Hash]
j.LastUpdate = time.Now()
s.mutex.Unlock()
if err := stream.Send(op); err != nil {
log.Warning("Failed to forward event for %s: %s", digest.Hash, err)
s.stopStream(digest, ch)
Expand Down Expand Up @@ -431,6 +435,7 @@ func (s *server) eventStream(digest *pb.Digest, create bool) (<-chan *longrunnin
// at best irrelevant and at worst outdated.
j.Current = nil
j.StartTime = time.Now()
j.LastUpdate = time.Now()
} else if j.Current != nil {
// This request is resuming an existing stream, give them an update on the latest thing to happen.
// This helps avoid 504s from taking too long to send response headers since it can be an arbitrary
Expand Down Expand Up @@ -590,7 +595,7 @@ func (s *server) periodicallyDeleteJobs() {

func shouldDeleteJob(j *job) bool {
timeSinceLastUpdate := time.Since(j.LastUpdate)
if j.Done && timeSinceLastUpdate > retentionTime {
if j.Done && len(j.Streams) == 0 && timeSinceLastUpdate > retentionTime {
return true
}
if !j.Done && len(j.Streams) == 0 && timeSinceLastUpdate > expiryTime {
Expand Down
24 changes: 22 additions & 2 deletions mettle/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,11 @@ func TestShouldDeleteJob(t *testing.T) {
shouldDelete: false,
},
{
name: "completed job after retention time returns true",
name: "completed job after retention time and no listeners returns true",
job: &job{
Done: true,
LastUpdate: now.Add(-6 * time.Minute),
Streams: []chan *longrunning.Operation{},
},
shouldDelete: true,
},
Expand All @@ -246,13 +247,32 @@ func TestShouldDeleteJob(t *testing.T) {
shouldDelete: true,
},
{
name: "incomplete job with listeners after 2x expiry time returns true",
name: "complete job with active listeners after 1x expiry time does not expire",
job: &job{
Done: true,
LastUpdate: now.Add(-121 * time.Minute),
Streams: []chan *longrunning.Operation{make(chan *longrunning.Operation), make(chan *longrunning.Operation)},
},
shouldDelete: false,
},
{
name: "incomplete job with listeners after 2x expiry time does expire",
job: &job{
Done: false,
LastUpdate: now.Add(-121 * time.Minute),
Streams: []chan *longrunning.Operation{make(chan *longrunning.Operation), make(chan *longrunning.Operation)},
},
shouldDelete: true,
},
{
name: "complete job with active listeners after 1x expiry time does not expire",
job: &job{
Done: true,
LastUpdate: now.Add(-61 * time.Minute),
Streams: []chan *longrunning.Operation{make(chan *longrunning.Operation), make(chan *longrunning.Operation)},
},
shouldDelete: false,
},
}

for _, test := range tests {
Expand Down

0 comments on commit fa2b0e1

Please sign in to comment.