Skip to content

Commit

Permalink
workflow: use heartbeats in acquire-pipeline
Browse files Browse the repository at this point in the history
It is cheaper to retry within the activity and let it run as long as it can
heartbeat. It reduces the pressure put in the workflow engine.
  • Loading branch information
sevein committed Apr 13, 2020
1 parent 291801a commit c7507e5
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 15 deletions.
27 changes: 22 additions & 5 deletions internal/workflow/activities/acquire_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package activities

import (
"context"
"errors"
"fmt"
"time"

"go.uber.org/cadence/activity"

wferrors "github.com/artefactual-labs/enduro/internal/workflow/errors"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
"github.com/cenkalti/backoff/v3"
)

// AcquirePipelineActivity acquires a lock in the weighted semaphore associated
Expand All @@ -24,9 +28,22 @@ func (a *AcquirePipelineActivity) Execute(ctx context.Context, pipelineName stri
return wferrors.NonRetryableError(err)
}

if ok := p.TryAcquire(); !ok {
return errors.New("error acquiring pipeline")
}
var errAcquirePipeline = fmt.Errorf("error acquring semaphore: busy")

err = backoff.RetryNotify(
func() (err error) {
ok := p.TryAcquire()
if !ok {
err = errAcquirePipeline
}

return err
},
backoff.WithContext(backoff.NewConstantBackOff(time.Second*5), ctx),
func(err error, duration time.Duration) {
activity.RecordHeartbeat(ctx)
},
)

return nil
return err
}
1 change: 1 addition & 0 deletions internal/workflow/activities/activities.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package activities implements Enduro's workflow activities.
package activities

const (
Expand Down
12 changes: 2 additions & 10 deletions internal/workflow/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/artefactual-labs/enduro/internal/workflow/manager"

"github.com/go-logr/logr"
"go.uber.org/cadence"
"go.uber.org/cadence/workflow"
)

Expand All @@ -20,16 +19,9 @@ func acquirePipeline(ctx workflow.Context, manager *manager.Manager, pipelineNam
{
ctx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
ScheduleToStartTimeout: forever,
StartToCloseTimeout: time.Second * 5,
WaitForCancellation: true,
RetryPolicy: &cadence.RetryPolicy{
InitialInterval: time.Second * 2,
BackoffCoefficient: 1,
MaximumInterval: time.Second * 2,
ExpirationInterval: forever,
},
StartToCloseTimeout: forever,
HeartbeatTimeout: time.Minute,
})

if err := workflow.ExecuteActivity(ctx, activities.AcquirePipelineActivityName, pipelineName).Get(ctx, nil); err != nil {
return fmt.Errorf("error acquiring pipeline: %w", err)
}
Expand Down

0 comments on commit c7507e5

Please sign in to comment.