Skip to content

Commit

Permalink
pipeline: use non-blocking acquire in semaphore
Browse files Browse the repository at this point in the history
Our dumb local semaphore has been showing a deadlock in staging. I can't
reproduce locally neither understand the reason. In this commit, I change the
activity so it uses the non-blocking version of the acquire method. As a result,
the activity will be executed more frequently (every two seconds). I can't see
how that is going to help but I'm running out of ideas.

We still aim to deal with proper resource usage management using monitoring
workflows but we haven't gotten to that just yet.
  • Loading branch information
sevein committed Feb 24, 2020
1 parent c24ca8e commit 79d379e
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 6 deletions.
4 changes: 2 additions & 2 deletions internal/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ func (p Pipeline) Config() *Config {
return p.config
}

func (p *Pipeline) Acquire(ctx context.Context) error {
return p.sem.Acquire(ctx, 1)
func (p *Pipeline) TryAcquire() bool {
return p.sem.TryAcquire(1)
}

func (p *Pipeline) Release() {
Expand Down
9 changes: 5 additions & 4 deletions internal/workflow/activities/acquire_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package activities

import (
"context"
"time"
"errors"

wferrors "github.com/artefactual-labs/enduro/internal/workflow/errors"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
Expand All @@ -24,8 +24,9 @@ func (a *AcquirePipelineActivity) Execute(ctx context.Context, pipelineName stri
return wferrors.NonRetryableError(err)
}

ctx, cancel := context.WithTimeout(ctx, time.Second*4)
defer cancel()
if ok := p.TryAcquire(); !ok {
return errors.New("error acquiring pipeline")
}

return p.Acquire(ctx)
return nil
}
1 change: 1 addition & 0 deletions internal/workflow/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func acquirePipeline(ctx workflow.Context, manager *manager.Manager, pipelineNam
ctx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
ScheduleToStartTimeout: forever,
StartToCloseTimeout: time.Second * 5,
WaitForCancellation: true,
RetryPolicy: &cadence.RetryPolicy{
InitialInterval: time.Second * 2,
BackoffCoefficient: 1,
Expand Down

0 comments on commit 79d379e

Please sign in to comment.