From c7507e55543e239d46936d1c4cffa43f1bc255b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Mon, 13 Apr 2020 11:38:36 -0700 Subject: [PATCH] workflow: use heartbeats in acquire-pipeline It is cheaper to retry within the activity and let it run as long as it can heartbeat. It reduces the pressure put in the workflow engine. --- .../workflow/activities/acquire_pipeline.go | 27 +++++++++++++++---- internal/workflow/activities/activities.go | 1 + internal/workflow/semaphore.go | 12 ++------- 3 files changed, 25 insertions(+), 15 deletions(-) diff --git a/internal/workflow/activities/acquire_pipeline.go b/internal/workflow/activities/acquire_pipeline.go index 76b78afb..d0b32176 100644 --- a/internal/workflow/activities/acquire_pipeline.go +++ b/internal/workflow/activities/acquire_pipeline.go @@ -2,10 +2,14 @@ package activities import ( "context" - "errors" + "fmt" + "time" + + "go.uber.org/cadence/activity" wferrors "github.com/artefactual-labs/enduro/internal/workflow/errors" "github.com/artefactual-labs/enduro/internal/workflow/manager" + "github.com/cenkalti/backoff/v3" ) // AcquirePipelineActivity acquires a lock in the weighted semaphore associated @@ -24,9 +28,22 @@ func (a *AcquirePipelineActivity) Execute(ctx context.Context, pipelineName stri return wferrors.NonRetryableError(err) } - if ok := p.TryAcquire(); !ok { - return errors.New("error acquiring pipeline") - } + var errAcquirePipeline = fmt.Errorf("error acquring semaphore: busy") + + err = backoff.RetryNotify( + func() (err error) { + ok := p.TryAcquire() + if !ok { + err = errAcquirePipeline + } + + return err + }, + backoff.WithContext(backoff.NewConstantBackOff(time.Second*5), ctx), + func(err error, duration time.Duration) { + activity.RecordHeartbeat(ctx) + }, + ) - return nil + return err } diff --git a/internal/workflow/activities/activities.go b/internal/workflow/activities/activities.go index cf346642..cc3b1f9d 100644 --- a/internal/workflow/activities/activities.go +++ b/internal/workflow/activities/activities.go @@ -1,3 +1,4 @@ +// Package activities implements Enduro's workflow activities. package activities const ( diff --git a/internal/workflow/semaphore.go b/internal/workflow/semaphore.go index 5fcd86cf..4efafe88 100644 --- a/internal/workflow/semaphore.go +++ b/internal/workflow/semaphore.go @@ -11,7 +11,6 @@ import ( "github.com/artefactual-labs/enduro/internal/workflow/manager" "github.com/go-logr/logr" - "go.uber.org/cadence" "go.uber.org/cadence/workflow" ) @@ -20,16 +19,9 @@ func acquirePipeline(ctx workflow.Context, manager *manager.Manager, pipelineNam { ctx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ ScheduleToStartTimeout: forever, - StartToCloseTimeout: time.Second * 5, - WaitForCancellation: true, - RetryPolicy: &cadence.RetryPolicy{ - InitialInterval: time.Second * 2, - BackoffCoefficient: 1, - MaximumInterval: time.Second * 2, - ExpirationInterval: forever, - }, + StartToCloseTimeout: forever, + HeartbeatTimeout: time.Minute, }) - if err := workflow.ExecuteActivity(ctx, activities.AcquirePipelineActivityName, pipelineName).Get(ctx, nil); err != nil { return fmt.Errorf("error acquiring pipeline: %w", err) }