diff --git a/agent/agent_worker.go b/agent/agent_worker.go index 13bd2247c5..2a15c5c68d 100644 --- a/agent/agent_worker.go +++ b/agent/agent_worker.go @@ -550,6 +550,17 @@ func (a *AgentWorker) Ping(ctx context.Context) (*api.Job, error) { // state. If the job is in an unassignable state, it will return an error immediately. // Otherwise, it will retry every 3s for 30 s. The whole operation will timeout after 5 min. func (a *AgentWorker) AcquireAndRunJob(ctx context.Context, jobId string) error { + ctx, cancel := context.WithCancel(ctx) + go func() { + for { + time.Sleep(500 * time.Millisecond) + if a.stopping { + cancel() + return + } + } + }() + job, err := a.client.AcquireJob(ctx, jobId) if err != nil { return fmt.Errorf("failed to acquire job: %w", err) diff --git a/agent/agent_worker_test.go b/agent/agent_worker_test.go index 9c87589e77..b9fb208750 100644 --- a/agent/agent_worker_test.go +++ b/agent/agent_worker_test.go @@ -218,8 +218,8 @@ func TestAcquireAndRunJobWaiting(t *testing.T) { } // the last Retry-After is not recorded as the retries loop exits before using it - expectedSleeps := make([]time.Duration, 0, 9) - for d := 1; d <= 1<<8; d *= 2 { + expectedSleeps := make([]time.Duration, 0, 6) + for d := 1; d <= 1<<5; d *= 2 { expectedSleeps = append(expectedSleeps, time.Duration(d)*time.Second) } assert.Equal(t, expectedSleeps, retrySleeps) diff --git a/core/client.go b/core/client.go index 6edf1bc542..f2d34eedbb 100644 --- a/core/client.go +++ b/core/client.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "math/rand/v2" "net/http" "os" "runtime" @@ -48,7 +47,7 @@ type Client struct { // AcquireJob acquires a specific job from Buildkite. // It doesn't interpret or run the job - the caller is responsible for that. -// It contains a builtin timeout of 270 seconds and makes up to 10 attempts. +// It contains a builtin timeout of 330 seconds and makes up to 7 attempts, backing off exponentially. func (c *Client) AcquireJob(ctx context.Context, jobID string) (*api.Job, error) { c.Logger.Info("Attempting to acquire job %s...", jobID) @@ -56,19 +55,22 @@ func (c *Client) AcquireJob(ctx context.Context, jobID string) (*api.Job, error) // large if the job is in the waiting state. // // If there were no delays or jitter, the attempts would happen at t = 0, 1, 2, 4, ..., 128s - // after the initial one. Therefore, there are 9 attempts taking at least 255s. If the jitter - // always hit the max of 1s, then another 8s is added to that. This is still comfortably within - // the timeout of 270s, and the bound seems tight enough so that the agent is not wasting time + // after the initial one. Therefore, there are 7 attempts taking at least 255s. If the jitter + // always hit the max of 5s, then another 40s is added to that. This is still comfortably within + // the timeout of 330s, and the bound seems tight enough so that the agent is not wasting time // waiting for a retry that will never happen. - timeoutCtx, cancel := context.WithTimeout(ctx, 270*time.Second) + timeoutCtx, cancel := context.WithTimeout(ctx, 330*time.Second) defer cancel() - // Acquire the job using the ID we were provided. We'll retry as best we can on non 422 error. - // Except for 423 errors, in which we exponentially back off under the direction of the API - // setting the Retry-After header + // Acquire the job using the ID we were provided. + // We'll retry as best we can on non 5xx errors, as well as 423 Locked and 429 Too Many Requests. + // For retryable errors, if available, we'll consume the value of the server-defined `Retry-After` response header + // to determine our next retry interval. + // 4xx errors that are not 423 or 429 will not be retried. r := roko.NewRetrier( - roko.WithMaxAttempts(10), - roko.WithStrategy(roko.Constant(3*time.Second)), + roko.WithMaxAttempts(7), + roko.WithStrategy(roko.Exponential(2*time.Second, 0)), + roko.WithJitterRange(-1*time.Second, 5*time.Second), roko.WithSleepFunc(c.RetrySleepFunc), ) @@ -83,23 +85,40 @@ func (c *Client) AcquireJob(ctx context.Context, jobID string) (*api.Job, error) c.Logger.Warn("%s (%s)", err, r) return nil, err } - switch resp.StatusCode { - case http.StatusUnprocessableEntity: + + switch { + case resp.StatusCode == http.StatusLocked: + // If the API returns with a 423, the job is in the waiting state. Let's try again later. + warning := fmt.Sprintf("The job is waiting for a dependency: (%s)", err) + handleRetriableJobAcquisitionError(warning, resp, r, c.Logger) + return nil, err + + case resp.StatusCode == http.StatusTooManyRequests: + // We're being rate limited by the backend. Let's try again later. + warning := fmt.Sprintf("Rate limited by the backend: %s", err) + handleRetriableJobAcquisitionError(warning, resp, r, c.Logger) + return nil, err + + case resp.StatusCode >= 500: + // It's a 5xx. Probably worth retrying + warning := fmt.Sprintf("Server error: %s", err) + handleRetriableJobAcquisitionError(warning, resp, r, c.Logger) + return nil, err + + case resp.StatusCode == http.StatusUnprocessableEntity: // If the API returns with a 422, it usually means that the job is in a state where it can't be acquired - - // e.g. it's already running on another agent, or has been cancelled, or has already run - c.Logger.Warn("Buildkite rejected the call to acquire the job (%s)", err) + // e.g. it's already running on another agent, or has been cancelled, or has already run. Don't retry + c.Logger.Error("Buildkite rejected the call to acquire the job: %s", err) r.Break() return nil, fmt.Errorf("%w: %w", ErrJobAcquisitionRejected, err) - case http.StatusLocked: - // If the API returns with a 423, the job is in the waiting state - c.Logger.Warn("The job is waiting for a dependency (%s)", err) - duration, errParseDuration := time.ParseDuration(resp.Header.Get("Retry-After") + "s") - if errParseDuration != nil { - duration = time.Second + rand.N(time.Second) - } - r.SetNextInterval(duration) + case resp.StatusCode >= 400 && resp.StatusCode < 500: + // It's some other client error - not 429 or 423, which we retry, or 422, which we don't, but gets a special log message + // Don't retry it, the odds of success are low + c.Logger.Error("%s", err) + r.Break() + return nil, err default: @@ -112,6 +131,25 @@ func (c *Client) AcquireJob(ctx context.Context, jobID string) (*api.Job, error) }) } +func handleRetriableJobAcquisitionError(warning string, resp *api.Response, r *roko.Retrier, logger logger.Logger) { + logger.Warn("%s (%s)", warning, r) + if resp != nil { + retryAfter := resp.Header.Get("Retry-After") + + // Only customize the retry interval if the Retry-After header is present. Otherwise, keep using the default retrier settings + if retryAfter == "" { + return + } + + duration, errParseDuration := time.ParseDuration(retryAfter + "s") + if errParseDuration != nil { + return // use the default retrier settings + } + + r.SetNextInterval(duration) + } +} + // Connects the agent to the Buildkite Agent API, retrying up to 10 times with 5 // seconds delay if it fails. func (c *Client) Connect(ctx context.Context) error { diff --git a/go.mod b/go.mod index 259323f196..c28181fb4d 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/buildkite/bintest/v3 v3.3.0 github.com/buildkite/go-pipeline v0.13.3 github.com/buildkite/interpolate v0.1.5 - github.com/buildkite/roko v1.2.0 + github.com/buildkite/roko v1.3.0 github.com/buildkite/shellwords v0.0.0-20180315084142-c3f497d1e000 github.com/creack/pty v1.1.19 github.com/denisbrodbeck/machineid v1.0.1 diff --git a/go.sum b/go.sum index 2e44d4d519..a2f5ca5487 100644 --- a/go.sum +++ b/go.sum @@ -114,8 +114,8 @@ github.com/buildkite/go-pipeline v0.13.3 h1:llI7sAdZ7sqYE7r8ePlmDADRhJ1K0Kua2+gv github.com/buildkite/go-pipeline v0.13.3/go.mod h1:1uC2XdHkTV1G5jYv9K8omERIwrsYbBruBrPx1Zu1uFw= github.com/buildkite/interpolate v0.1.5 h1:v2Ji3voik69UZlbfoqzx+qfcsOKLA61nHdU79VV+tPU= github.com/buildkite/interpolate v0.1.5/go.mod h1:dHnrwHew5O8VNOAgMDpwRlFnhL5VSN6M1bHVmRZ9Ccc= -github.com/buildkite/roko v1.2.0 h1:hbNURz//dQqNl6Eo9awjQOVOZwSDJ8VEbBDxSfT9rGQ= -github.com/buildkite/roko v1.2.0/go.mod h1:23R9e6nHxgedznkwwfmqZ6+0VJZJZ2Sg/uVcp2cP46I= +github.com/buildkite/roko v1.3.0 h1:Lgv5XK0rr0uCCZQqssavdwjFs550j8ovyVmnnLMfS/E= +github.com/buildkite/roko v1.3.0/go.mod h1:23R9e6nHxgedznkwwfmqZ6+0VJZJZ2Sg/uVcp2cP46I= github.com/buildkite/shellwords v0.0.0-20180315084142-c3f497d1e000 h1:hiVSLk7s3yFKFOHF/huoShLqrj13RMguWX2yzfvy7es= github.com/buildkite/shellwords v0.0.0-20180315084142-c3f497d1e000/go.mod h1:gv0DYOzHEsKgo31lTCDGauIg4DTTGn41Bzp+t3wSOlk= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=