-
Notifications
You must be signed in to change notification settings - Fork 7
fix(managedstream): stop terminal ingest retry storms #294
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,8 @@ package managedstream | |
| import ( | ||
| "bytes" | ||
| "context" | ||
| "crypto/sha256" | ||
| "encoding/hex" | ||
| "encoding/json" | ||
| "errors" | ||
| "fmt" | ||
|
|
@@ -28,6 +30,7 @@ const ( | |
| DefaultBatchLimit = 100 | ||
| MaxPayloadBytes = 192 * 1024 | ||
| DefaultInterval = 10 * time.Second | ||
| DefaultCooldown = 15 * time.Minute | ||
|
|
||
| maxPayloadSessions = 100 | ||
| maxPayloadActions = 1000 | ||
|
|
@@ -41,6 +44,14 @@ const ( | |
|
|
||
| var hostedErrorSecretPattern = regexp.MustCompile(`(?i)("(?:[^"]*(?:api[_-]?key|authorization|client[_-]?secret|credential|install[_-]?token|password|secret|token)[^"]*)"\s*:\s*")([^"]*)(")`) | ||
|
|
||
| type failureKind string | ||
|
|
||
| const ( | ||
| failureKindTerminalConfig failureKind = "terminal_config" | ||
| ) | ||
|
|
||
| type streamConfigFingerprint string | ||
|
|
||
| type Options struct { | ||
| DBPath string | ||
| StatePath string | ||
|
|
@@ -75,8 +86,14 @@ type Device struct { | |
| } | ||
|
|
||
| type State struct { | ||
| UpdatedAfter string `json:"updated_after,omitempty"` | ||
| ActionID string `json:"action_id,omitempty"` | ||
| UpdatedAfter string `json:"updated_after,omitempty"` | ||
| ActionID string `json:"action_id,omitempty"` | ||
| FailureKind failureKind `json:"failure_kind,omitempty"` | ||
| FailureStatus int `json:"failure_status,omitempty"` | ||
| FailureMessage string `json:"failure_message,omitempty"` | ||
| FailureConfig streamConfigFingerprint `json:"failure_config,omitempty"` | ||
| FailureCount int `json:"failure_count,omitempty"` | ||
| CooldownUntil string `json:"cooldown_until,omitempty"` | ||
| } | ||
|
|
||
| func Run(ctx context.Context, opts Options) error { | ||
|
|
@@ -124,6 +141,13 @@ func Flush(ctx context.Context, opts Options) error { | |
| if err != nil { | ||
| return err | ||
| } | ||
| configKey := configFingerprint(opts) | ||
| if state.FailureConfig != "" && state.FailureConfig != configKey { | ||
| state = clearFailureState(state) | ||
| if err := SaveState(statePath, state); err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| var updatedAfter *time.Time | ||
| if state.UpdatedAfter != "" { | ||
|
|
@@ -134,6 +158,14 @@ func Flush(ctx context.Context, opts Options) error { | |
| updatedAfter = &parsed | ||
| } | ||
|
|
||
| active, err := cooldownActive(state, configKey, time.Now().UTC()) | ||
| if err != nil { | ||
| return fmt.Errorf("parse managed stream cooldown state: %w", err) | ||
| } | ||
| if active { | ||
| return fmt.Errorf("managed stream paused during cooldown after terminal hosted ingest failure until %s: %s", state.CooldownUntil, state.FailureMessage) | ||
| } | ||
|
|
||
| limit := batchLimit(opts.BatchLimit) | ||
| for { | ||
| batch, err := store.LedgerBatch(ctx, sqlite.LedgerExportOptions{ | ||
|
|
@@ -191,19 +223,35 @@ func Flush(ctx context.Context, opts Options) error { | |
|
|
||
| if err := post(ctx, opts, body); err != nil { | ||
| var hostedErr *hostedIngestError | ||
| if errors.As(err, &hostedErr) && shouldRetryWithSmallerBatch(hostedErr.StatusCode) { | ||
| if limit == 1 { | ||
| return err | ||
| if errors.As(err, &hostedErr) { | ||
| if shouldRetryWithSmallerBatch(hostedErr) { | ||
| if limit == 1 || len(batch.Actions) <= 1 { | ||
| if hostedErr.StatusCode != http.StatusRequestEntityTooLarge { | ||
| return recordTerminalConfigFailure(statePath, state, configKey, hostedErr) | ||
| } | ||
| return advancePastMinimumBatch( | ||
| statePath, | ||
| batch, | ||
| fmt.Sprintf("hosted status %d at minimum batch size", hostedErr.StatusCode), | ||
| hostedErr, | ||
| ) | ||
| } | ||
| nextLimit := reducedBatchLimit(limit) | ||
| opts.Diagnostic.Printf( | ||
| "managed stream hosted ingest returned status %d; reducing batch limit from %d to %d\n", | ||
| hostedErr.StatusCode, | ||
| limit, | ||
| nextLimit, | ||
| ) | ||
| limit = nextLimit | ||
| continue | ||
| } | ||
| if hostedErr.StatusCode == http.StatusBadRequest { | ||
| return recordTerminalConfigFailure(statePath, state, configKey, hostedErr) | ||
| } | ||
| if hostedErr.StatusCode == http.StatusUnprocessableEntity { | ||
| return recordTerminalConfigFailure(statePath, state, configKey, hostedErr) | ||
| } | ||
| nextLimit := reducedBatchLimit(limit) | ||
| opts.Diagnostic.Printf( | ||
| "managed stream hosted ingest returned status %d; reducing batch limit from %d to %d\n", | ||
| hostedErr.StatusCode, | ||
| limit, | ||
| nextLimit, | ||
| ) | ||
| limit = nextLimit | ||
| continue | ||
| } | ||
| return err | ||
| } | ||
|
|
@@ -281,10 +329,26 @@ func reducedBatchLimit(limit int) int { | |
| return next | ||
| } | ||
|
|
||
| func shouldRetryWithSmallerBatch(statusCode int) bool { | ||
| return statusCode == http.StatusBadRequest || | ||
| statusCode == http.StatusRequestEntityTooLarge || | ||
| statusCode == http.StatusUnprocessableEntity | ||
| func shouldRetryWithSmallerBatch(err *hostedIngestError) bool { | ||
| return err.StatusCode == http.StatusRequestEntityTooLarge || | ||
| ((err.StatusCode == http.StatusBadRequest || err.StatusCode == http.StatusUnprocessableEntity) && | ||
| isHostedPayloadSizeError(err.Body)) | ||
| } | ||
|
|
||
| func isHostedPayloadSizeError(body string) bool { | ||
| body = strings.ToLower(body) | ||
| if strings.Contains(body, "payload too large") || | ||
| strings.Contains(body, "request entity too large") { | ||
| return true | ||
| } | ||
| if !strings.Contains(body, "must contain no more than") && | ||
| !strings.Contains(body, "exceeds max") { | ||
| return false | ||
| } | ||
| return strings.Contains(body, "agent_sessions") || | ||
| strings.Contains(body, "authorization_actions") || | ||
| strings.Contains(body, "authorization_receipts") || | ||
| strings.Contains(body, "body_bytes") | ||
| } | ||
|
|
||
| func payloadLimitViolation(payload Payload, bodyBytes int) string { | ||
|
|
@@ -318,11 +382,59 @@ func advancePastMinimumBatch(statePath string, batch sqlite.LedgerBatch, reason | |
| return fmt.Errorf("managed stream advanced cursor past oversized minimum batch: %s", reason) | ||
| } | ||
|
|
||
| func recordTerminalConfigFailure(statePath string, state State, configKey streamConfigFingerprint, err *hostedIngestError) error { | ||
| state.FailureKind = failureKindTerminalConfig | ||
| state.FailureStatus = err.StatusCode | ||
| state.FailureMessage = redactHostedErrorBody(err.Body) | ||
| state.FailureConfig = configKey | ||
| state.FailureCount++ | ||
| state.CooldownUntil = time.Now().UTC().Add(DefaultCooldown).Format(time.RFC3339Nano) | ||
| if err := SaveState(statePath, state); err != nil { | ||
| return err | ||
| } | ||
| return fmt.Errorf("managed stream entered cooldown after terminal hosted ingest failure: %w", err) | ||
| } | ||
|
|
||
| func saveCursor(statePath string, batch sqlite.LedgerBatch) error { | ||
| return SaveState(statePath, State{ | ||
| UpdatedAfter: batch.Cursor.UpdatedAt.UTC().Format(time.RFC3339Nano), | ||
| ActionID: batch.Cursor.ActionID, | ||
| }) | ||
| return SaveState(statePath, stateWithCursor(State{}, batch)) | ||
| } | ||
|
|
||
| func stateWithCursor(state State, batch sqlite.LedgerBatch) State { | ||
| if batch.Cursor == nil { | ||
| return state | ||
| } | ||
| state.UpdatedAfter = batch.Cursor.UpdatedAt.UTC().Format(time.RFC3339Nano) | ||
| state.ActionID = batch.Cursor.ActionID | ||
| return state | ||
| } | ||
|
|
||
| func cooldownActive(state State, configKey streamConfigFingerprint, now time.Time) (bool, error) { | ||
| if state.FailureKind != failureKindTerminalConfig || state.FailureConfig != configKey || state.CooldownUntil == "" { | ||
| return false, nil | ||
| } | ||
| until, err := time.Parse(time.RFC3339Nano, state.CooldownUntil) | ||
| if err != nil { | ||
| return false, err | ||
| } | ||
| return now.Before(until), nil | ||
| } | ||
|
|
||
| func clearFailureState(state State) State { | ||
| return State{ | ||
| UpdatedAfter: state.UpdatedAfter, | ||
| ActionID: state.ActionID, | ||
| } | ||
| } | ||
|
|
||
| func configFingerprint(opts Options) streamConfigFingerprint { | ||
| values := strings.Join([]string{ | ||
| strings.TrimSpace(opts.CloudURL), | ||
| strings.TrimSpace(opts.OrganizationID), | ||
| strings.TrimSpace(opts.InstallationID), | ||
| strings.TrimSpace(opts.InstallToken), | ||
| }, "\x00") | ||
|
Comment on lines
+430
to
+435
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| sum := sha256.Sum256([]byte(values)) | ||
| return streamConfigFingerprint(hex.EncodeToString(sum[:])) | ||
| } | ||
|
|
||
| func parseStateUpdatedAfter(value string) (time.Time, error) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CooldownUntilpasses,cooldownActivereturns false for the sameFailureConfig, soFlushposts the same terminally rejected rows again and then records a new 15 minute cooldown. For an organization mismatch or schema/config 400, this keeps replaying the same unsent ledger data every cooldown window even though nothing changed, which only slows the retry storm instead of stopping terminal failures until inputs change.