Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 134 additions & 22 deletions internal/managedstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package managedstream
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand All @@ -28,6 +30,7 @@ const (
DefaultBatchLimit = 100
MaxPayloadBytes = 192 * 1024
DefaultInterval = 10 * time.Second
DefaultCooldown = 15 * time.Minute

maxPayloadSessions = 100
maxPayloadActions = 1000
Expand All @@ -41,6 +44,14 @@ const (

var hostedErrorSecretPattern = regexp.MustCompile(`(?i)("(?:[^"]*(?:api[_-]?key|authorization|client[_-]?secret|credential|install[_-]?token|password|secret|token)[^"]*)"\s*:\s*")([^"]*)(")`)

type failureKind string

const (
failureKindTerminalConfig failureKind = "terminal_config"
)

type streamConfigFingerprint string

type Options struct {
DBPath string
StatePath string
Expand Down Expand Up @@ -75,8 +86,14 @@ type Device struct {
}

type State struct {
UpdatedAfter string `json:"updated_after,omitempty"`
ActionID string `json:"action_id,omitempty"`
UpdatedAfter string `json:"updated_after,omitempty"`
ActionID string `json:"action_id,omitempty"`
FailureKind failureKind `json:"failure_kind,omitempty"`
FailureStatus int `json:"failure_status,omitempty"`
FailureMessage string `json:"failure_message,omitempty"`
FailureConfig streamConfigFingerprint `json:"failure_config,omitempty"`
FailureCount int `json:"failure_count,omitempty"`
CooldownUntil string `json:"cooldown_until,omitempty"`
}

func Run(ctx context.Context, opts Options) error {
Expand Down Expand Up @@ -124,6 +141,13 @@ func Flush(ctx context.Context, opts Options) error {
if err != nil {
return err
}
configKey := configFingerprint(opts)
if state.FailureConfig != "" && state.FailureConfig != configKey {
state = clearFailureState(state)
if err := SaveState(statePath, state); err != nil {
return err
}
}

var updatedAfter *time.Time
if state.UpdatedAfter != "" {
Expand All @@ -134,6 +158,14 @@ func Flush(ctx context.Context, opts Options) error {
updatedAfter = &parsed
}

active, err := cooldownActive(state, configKey, time.Now().UTC())
if err != nil {
return fmt.Errorf("parse managed stream cooldown state: %w", err)
}
if active {
return fmt.Errorf("managed stream paused during cooldown after terminal hosted ingest failure until %s: %s", state.CooldownUntil, state.FailureMessage)
}

limit := batchLimit(opts.BatchLimit)
for {
batch, err := store.LedgerBatch(ctx, sqlite.LedgerExportOptions{
Expand Down Expand Up @@ -191,19 +223,35 @@ func Flush(ctx context.Context, opts Options) error {

if err := post(ctx, opts, body); err != nil {
var hostedErr *hostedIngestError
if errors.As(err, &hostedErr) && shouldRetryWithSmallerBatch(hostedErr.StatusCode) {
if limit == 1 {
return err
if errors.As(err, &hostedErr) {
if shouldRetryWithSmallerBatch(hostedErr) {
if limit == 1 || len(batch.Actions) <= 1 {
if hostedErr.StatusCode != http.StatusRequestEntityTooLarge {
return recordTerminalConfigFailure(statePath, state, configKey, hostedErr)
}
return advancePastMinimumBatch(
statePath,
batch,
fmt.Sprintf("hosted status %d at minimum batch size", hostedErr.StatusCode),
hostedErr,
)
}
nextLimit := reducedBatchLimit(limit)
opts.Diagnostic.Printf(
"managed stream hosted ingest returned status %d; reducing batch limit from %d to %d\n",
hostedErr.StatusCode,
limit,
nextLimit,
)
limit = nextLimit
continue
}
if hostedErr.StatusCode == http.StatusBadRequest {
return recordTerminalConfigFailure(statePath, state, configKey, hostedErr)
}
if hostedErr.StatusCode == http.StatusUnprocessableEntity {
return recordTerminalConfigFailure(statePath, state, configKey, hostedErr)
}
nextLimit := reducedBatchLimit(limit)
opts.Diagnostic.Printf(
"managed stream hosted ingest returned status %d; reducing batch limit from %d to %d\n",
hostedErr.StatusCode,
limit,
nextLimit,
)
limit = nextLimit
continue
}
return err
}
Expand Down Expand Up @@ -281,10 +329,26 @@ func reducedBatchLimit(limit int) int {
return next
}

func shouldRetryWithSmallerBatch(statusCode int) bool {
return statusCode == http.StatusBadRequest ||
statusCode == http.StatusRequestEntityTooLarge ||
statusCode == http.StatusUnprocessableEntity
func shouldRetryWithSmallerBatch(err *hostedIngestError) bool {
return err.StatusCode == http.StatusRequestEntityTooLarge ||
((err.StatusCode == http.StatusBadRequest || err.StatusCode == http.StatusUnprocessableEntity) &&
isHostedPayloadSizeError(err.Body))
}

func isHostedPayloadSizeError(body string) bool {
body = strings.ToLower(body)
if strings.Contains(body, "payload too large") ||
strings.Contains(body, "request entity too large") {
return true
}
if !strings.Contains(body, "must contain no more than") &&
!strings.Contains(body, "exceeds max") {
return false
}
return strings.Contains(body, "agent_sessions") ||
strings.Contains(body, "authorization_actions") ||
strings.Contains(body, "authorization_receipts") ||
strings.Contains(body, "body_bytes")
}

func payloadLimitViolation(payload Payload, bodyBytes int) string {
Expand Down Expand Up @@ -318,11 +382,59 @@ func advancePastMinimumBatch(statePath string, batch sqlite.LedgerBatch, reason
return fmt.Errorf("managed stream advanced cursor past oversized minimum batch: %s", reason)
}

func recordTerminalConfigFailure(statePath string, state State, configKey streamConfigFingerprint, err *hostedIngestError) error {
state.FailureKind = failureKindTerminalConfig
state.FailureStatus = err.StatusCode
state.FailureMessage = redactHostedErrorBody(err.Body)
state.FailureConfig = configKey
state.FailureCount++
state.CooldownUntil = time.Now().UTC().Add(DefaultCooldown).Format(time.RFC3339Nano)
if err := SaveState(statePath, state); err != nil {
return err
}
return fmt.Errorf("managed stream entered cooldown after terminal hosted ingest failure: %w", err)
}

func saveCursor(statePath string, batch sqlite.LedgerBatch) error {
return SaveState(statePath, State{
UpdatedAfter: batch.Cursor.UpdatedAt.UTC().Format(time.RFC3339Nano),
ActionID: batch.Cursor.ActionID,
})
return SaveState(statePath, stateWithCursor(State{}, batch))
}

func stateWithCursor(state State, batch sqlite.LedgerBatch) State {
if batch.Cursor == nil {
return state
}
state.UpdatedAfter = batch.Cursor.UpdatedAt.UTC().Format(time.RFC3339Nano)
state.ActionID = batch.Cursor.ActionID
return state
}

func cooldownActive(state State, configKey streamConfigFingerprint, now time.Time) (bool, error) {
if state.FailureKind != failureKindTerminalConfig || state.FailureConfig != configKey || state.CooldownUntil == "" {
return false, nil
}
until, err := time.Parse(time.RFC3339Nano, state.CooldownUntil)
if err != nil {
return false, err
}
return now.Before(until), nil

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Cooldown still retries After CooldownUntil passes, cooldownActive returns false for the same FailureConfig, so Flush posts the same terminally rejected rows again and then records a new 15 minute cooldown. For an organization mismatch or schema/config 400, this keeps replaying the same unsent ledger data every cooldown window even though nothing changed, which only slows the retry storm instead of stopping terminal failures until inputs change.

}

func clearFailureState(state State) State {
return State{
UpdatedAfter: state.UpdatedAfter,
ActionID: state.ActionID,
}
}

func configFingerprint(opts Options) streamConfigFingerprint {
values := strings.Join([]string{
strings.TrimSpace(opts.CloudURL),
strings.TrimSpace(opts.OrganizationID),
strings.TrimSpace(opts.InstallationID),
strings.TrimSpace(opts.InstallToken),
}, "\x00")
Comment on lines +430 to +435

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Fingerprint misses payload config The persisted failure only clears when this fingerprint changes, but the request payload also includes DeviceLabel and the per-flush DeploymentVersion(). If hosted rejects device.label or device.deployment_version with a terminal 400/422, fixing that effective upload config does not change FailureConfig, and the cooldown check runs before the payload is rebuilt. The stream can remain paused even after the bad device/deployment value was corrected.

sum := sha256.Sum256([]byte(values))
return streamConfigFingerprint(hex.EncodeToString(sum[:]))
}

func parseStateUpdatedAfter(value string) (time.Time, error) {
Expand Down
Loading
Loading