Skip to content

Commit

Permalink
Merge pull request #3153 from buildkite/expo-backoff-for-acquire
Browse files Browse the repository at this point in the history
PIPE-631 Backoff exponentially when job acquisition fails
  • Loading branch information
moskyb authored Jan 9, 2025
2 parents b6015a8 + d52a6d0 commit d850b29
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 28 deletions.
11 changes: 11 additions & 0 deletions agent/agent_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,17 @@ func (a *AgentWorker) Ping(ctx context.Context) (*api.Job, error) {
// state. If the job is in an unassignable state, it will return an error immediately.
// Otherwise, it will retry every 3s for 30 s. The whole operation will timeout after 5 min.
func (a *AgentWorker) AcquireAndRunJob(ctx context.Context, jobId string) error {
ctx, cancel := context.WithCancel(ctx)
go func() {
for {
time.Sleep(500 * time.Millisecond)
if a.stopping {
cancel()
return
}
}
}()

job, err := a.client.AcquireJob(ctx, jobId)
if err != nil {
return fmt.Errorf("failed to acquire job: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions agent/agent_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ func TestAcquireAndRunJobWaiting(t *testing.T) {
}

// the last Retry-After is not recorded as the retries loop exits before using it
expectedSleeps := make([]time.Duration, 0, 9)
for d := 1; d <= 1<<8; d *= 2 {
expectedSleeps := make([]time.Duration, 0, 6)
for d := 1; d <= 1<<5; d *= 2 {
expectedSleeps = append(expectedSleeps, time.Duration(d)*time.Second)
}
assert.Equal(t, expectedSleeps, retrySleeps)
Expand Down
84 changes: 61 additions & 23 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"math/rand/v2"
"net/http"
"os"
"runtime"
Expand Down Expand Up @@ -48,27 +47,30 @@ type Client struct {

// AcquireJob acquires a specific job from Buildkite.
// It doesn't interpret or run the job - the caller is responsible for that.
// It contains a builtin timeout of 270 seconds and makes up to 10 attempts.
// It contains a builtin timeout of 330 seconds and makes up to 7 attempts, backing off exponentially.
func (c *Client) AcquireJob(ctx context.Context, jobID string) (*api.Job, error) {
c.Logger.Info("Attempting to acquire job %s...", jobID)

// Timeout the context to prevent the exponential backoff from growing too
// large if the job is in the waiting state.
//
// If there were no delays or jitter, the attempts would happen at t = 0, 1, 2, 4, ..., 128s
// after the initial one. Therefore, there are 9 attempts taking at least 255s. If the jitter
// always hit the max of 1s, then another 8s is added to that. This is still comfortably within
// the timeout of 270s, and the bound seems tight enough so that the agent is not wasting time
// after the initial one. Therefore, there are 7 attempts taking at least 255s. If the jitter
// always hit the max of 5s, then another 40s is added to that. This is still comfortably within
// the timeout of 330s, and the bound seems tight enough so that the agent is not wasting time
// waiting for a retry that will never happen.
timeoutCtx, cancel := context.WithTimeout(ctx, 270*time.Second)
timeoutCtx, cancel := context.WithTimeout(ctx, 330*time.Second)
defer cancel()

// Acquire the job using the ID we were provided. We'll retry as best we can on non 422 error.
// Except for 423 errors, in which we exponentially back off under the direction of the API
// setting the Retry-After header
// Acquire the job using the ID we were provided.
// We'll retry as best we can on non 5xx errors, as well as 423 Locked and 429 Too Many Requests.
// For retryable errors, if available, we'll consume the value of the server-defined `Retry-After` response header
// to determine our next retry interval.
// 4xx errors that are not 423 or 429 will not be retried.
r := roko.NewRetrier(
roko.WithMaxAttempts(10),
roko.WithStrategy(roko.Constant(3*time.Second)),
roko.WithMaxAttempts(7),
roko.WithStrategy(roko.Exponential(2*time.Second, 0)),
roko.WithJitterRange(-1*time.Second, 5*time.Second),
roko.WithSleepFunc(c.RetrySleepFunc),
)

Expand All @@ -83,23 +85,40 @@ func (c *Client) AcquireJob(ctx context.Context, jobID string) (*api.Job, error)
c.Logger.Warn("%s (%s)", err, r)
return nil, err
}
switch resp.StatusCode {
case http.StatusUnprocessableEntity:

switch {
case resp.StatusCode == http.StatusLocked:
// If the API returns with a 423, the job is in the waiting state. Let's try again later.
warning := fmt.Sprintf("The job is waiting for a dependency: (%s)", err)
handleRetriableJobAcquisitionError(warning, resp, r, c.Logger)
return nil, err

case resp.StatusCode == http.StatusTooManyRequests:
// We're being rate limited by the backend. Let's try again later.
warning := fmt.Sprintf("Rate limited by the backend: %s", err)
handleRetriableJobAcquisitionError(warning, resp, r, c.Logger)
return nil, err

case resp.StatusCode >= 500:
// It's a 5xx. Probably worth retrying
warning := fmt.Sprintf("Server error: %s", err)
handleRetriableJobAcquisitionError(warning, resp, r, c.Logger)
return nil, err

case resp.StatusCode == http.StatusUnprocessableEntity:
// If the API returns with a 422, it usually means that the job is in a state where it can't be acquired -
// e.g. it's already running on another agent, or has been cancelled, or has already run
c.Logger.Warn("Buildkite rejected the call to acquire the job (%s)", err)
// e.g. it's already running on another agent, or has been cancelled, or has already run. Don't retry
c.Logger.Error("Buildkite rejected the call to acquire the job: %s", err)
r.Break()

return nil, fmt.Errorf("%w: %w", ErrJobAcquisitionRejected, err)

case http.StatusLocked:
// If the API returns with a 423, the job is in the waiting state
c.Logger.Warn("The job is waiting for a dependency (%s)", err)
duration, errParseDuration := time.ParseDuration(resp.Header.Get("Retry-After") + "s")
if errParseDuration != nil {
duration = time.Second + rand.N(time.Second)
}
r.SetNextInterval(duration)
case resp.StatusCode >= 400 && resp.StatusCode < 500:
// It's some other client error - not 429 or 423, which we retry, or 422, which we don't, but gets a special log message
// Don't retry it, the odds of success are low
c.Logger.Error("%s", err)
r.Break()

return nil, err

default:
Expand All @@ -112,6 +131,25 @@ func (c *Client) AcquireJob(ctx context.Context, jobID string) (*api.Job, error)
})
}

func handleRetriableJobAcquisitionError(warning string, resp *api.Response, r *roko.Retrier, logger logger.Logger) {
logger.Warn("%s (%s)", warning, r)
if resp != nil {
retryAfter := resp.Header.Get("Retry-After")

// Only customize the retry interval if the Retry-After header is present. Otherwise, keep using the default retrier settings
if retryAfter == "" {
return
}

duration, errParseDuration := time.ParseDuration(retryAfter + "s")
if errParseDuration != nil {
return // use the default retrier settings
}

r.SetNextInterval(duration)
}
}

// Connects the agent to the Buildkite Agent API, retrying up to 10 times with 5
// seconds delay if it fails.
func (c *Client) Connect(ctx context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/buildkite/bintest/v3 v3.3.0
github.com/buildkite/go-pipeline v0.13.3
github.com/buildkite/interpolate v0.1.5
github.com/buildkite/roko v1.2.0
github.com/buildkite/roko v1.3.0
github.com/buildkite/shellwords v0.0.0-20180315084142-c3f497d1e000
github.com/creack/pty v1.1.19
github.com/denisbrodbeck/machineid v1.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ github.com/buildkite/go-pipeline v0.13.3 h1:llI7sAdZ7sqYE7r8ePlmDADRhJ1K0Kua2+gv
github.com/buildkite/go-pipeline v0.13.3/go.mod h1:1uC2XdHkTV1G5jYv9K8omERIwrsYbBruBrPx1Zu1uFw=
github.com/buildkite/interpolate v0.1.5 h1:v2Ji3voik69UZlbfoqzx+qfcsOKLA61nHdU79VV+tPU=
github.com/buildkite/interpolate v0.1.5/go.mod h1:dHnrwHew5O8VNOAgMDpwRlFnhL5VSN6M1bHVmRZ9Ccc=
github.com/buildkite/roko v1.2.0 h1:hbNURz//dQqNl6Eo9awjQOVOZwSDJ8VEbBDxSfT9rGQ=
github.com/buildkite/roko v1.2.0/go.mod h1:23R9e6nHxgedznkwwfmqZ6+0VJZJZ2Sg/uVcp2cP46I=
github.com/buildkite/roko v1.3.0 h1:Lgv5XK0rr0uCCZQqssavdwjFs550j8ovyVmnnLMfS/E=
github.com/buildkite/roko v1.3.0/go.mod h1:23R9e6nHxgedznkwwfmqZ6+0VJZJZ2Sg/uVcp2cP46I=
github.com/buildkite/shellwords v0.0.0-20180315084142-c3f497d1e000 h1:hiVSLk7s3yFKFOHF/huoShLqrj13RMguWX2yzfvy7es=
github.com/buildkite/shellwords v0.0.0-20180315084142-c3f497d1e000/go.mod h1:gv0DYOzHEsKgo31lTCDGauIg4DTTGn41Bzp+t3wSOlk=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
Expand Down

0 comments on commit d850b29

Please sign in to comment.