diff --git a/internal/managedstream/stream.go b/internal/managedstream/stream.go index 679065bd..bb05d0fc 100644 --- a/internal/managedstream/stream.go +++ b/internal/managedstream/stream.go @@ -3,6 +3,8 @@ package managedstream import ( "bytes" "context" + "crypto/sha256" + "encoding/hex" "encoding/json" "errors" "fmt" @@ -28,6 +30,7 @@ const ( DefaultBatchLimit = 100 MaxPayloadBytes = 192 * 1024 DefaultInterval = 10 * time.Second + DefaultCooldown = 15 * time.Minute maxPayloadSessions = 100 maxPayloadActions = 1000 @@ -41,6 +44,14 @@ const ( var hostedErrorSecretPattern = regexp.MustCompile(`(?i)("(?:[^"]*(?:api[_-]?key|authorization|client[_-]?secret|credential|install[_-]?token|password|secret|token)[^"]*)"\s*:\s*")([^"]*)(")`) +type failureKind string + +const ( + failureKindTerminalConfig failureKind = "terminal_config" +) + +type streamConfigFingerprint string + type Options struct { DBPath string StatePath string @@ -75,8 +86,14 @@ type Device struct { } type State struct { - UpdatedAfter string `json:"updated_after,omitempty"` - ActionID string `json:"action_id,omitempty"` + UpdatedAfter string `json:"updated_after,omitempty"` + ActionID string `json:"action_id,omitempty"` + FailureKind failureKind `json:"failure_kind,omitempty"` + FailureStatus int `json:"failure_status,omitempty"` + FailureMessage string `json:"failure_message,omitempty"` + FailureConfig streamConfigFingerprint `json:"failure_config,omitempty"` + FailureCount int `json:"failure_count,omitempty"` + CooldownUntil string `json:"cooldown_until,omitempty"` } func Run(ctx context.Context, opts Options) error { @@ -124,6 +141,13 @@ func Flush(ctx context.Context, opts Options) error { if err != nil { return err } + configKey := configFingerprint(opts) + if state.FailureConfig != "" && state.FailureConfig != configKey { + state = clearFailureState(state) + if err := SaveState(statePath, state); err != nil { + return err + } + } var updatedAfter *time.Time if state.UpdatedAfter != "" { @@ -134,6 +158,14 @@ func Flush(ctx context.Context, opts Options) error { updatedAfter = &parsed } + active, err := cooldownActive(state, configKey, time.Now().UTC()) + if err != nil { + return fmt.Errorf("parse managed stream cooldown state: %w", err) + } + if active { + return fmt.Errorf("managed stream paused during cooldown after terminal hosted ingest failure until %s: %s", state.CooldownUntil, state.FailureMessage) + } + limit := batchLimit(opts.BatchLimit) for { batch, err := store.LedgerBatch(ctx, sqlite.LedgerExportOptions{ @@ -191,19 +223,35 @@ func Flush(ctx context.Context, opts Options) error { if err := post(ctx, opts, body); err != nil { var hostedErr *hostedIngestError - if errors.As(err, &hostedErr) && shouldRetryWithSmallerBatch(hostedErr.StatusCode) { - if limit == 1 { - return err + if errors.As(err, &hostedErr) { + if shouldRetryWithSmallerBatch(hostedErr) { + if limit == 1 || len(batch.Actions) <= 1 { + if hostedErr.StatusCode != http.StatusRequestEntityTooLarge { + return recordTerminalConfigFailure(statePath, state, configKey, hostedErr) + } + return advancePastMinimumBatch( + statePath, + batch, + fmt.Sprintf("hosted status %d at minimum batch size", hostedErr.StatusCode), + hostedErr, + ) + } + nextLimit := reducedBatchLimit(limit) + opts.Diagnostic.Printf( + "managed stream hosted ingest returned status %d; reducing batch limit from %d to %d\n", + hostedErr.StatusCode, + limit, + nextLimit, + ) + limit = nextLimit + continue + } + if hostedErr.StatusCode == http.StatusBadRequest { + return recordTerminalConfigFailure(statePath, state, configKey, hostedErr) + } + if hostedErr.StatusCode == http.StatusUnprocessableEntity { + return recordTerminalConfigFailure(statePath, state, configKey, hostedErr) } - nextLimit := reducedBatchLimit(limit) - opts.Diagnostic.Printf( - "managed stream hosted ingest returned status %d; reducing batch limit from %d to %d\n", - hostedErr.StatusCode, - limit, - nextLimit, - ) - limit = nextLimit - continue } return err } @@ -281,10 +329,26 @@ func reducedBatchLimit(limit int) int { return next } -func shouldRetryWithSmallerBatch(statusCode int) bool { - return statusCode == http.StatusBadRequest || - statusCode == http.StatusRequestEntityTooLarge || - statusCode == http.StatusUnprocessableEntity +func shouldRetryWithSmallerBatch(err *hostedIngestError) bool { + return err.StatusCode == http.StatusRequestEntityTooLarge || + ((err.StatusCode == http.StatusBadRequest || err.StatusCode == http.StatusUnprocessableEntity) && + isHostedPayloadSizeError(err.Body)) +} + +func isHostedPayloadSizeError(body string) bool { + body = strings.ToLower(body) + if strings.Contains(body, "payload too large") || + strings.Contains(body, "request entity too large") { + return true + } + if !strings.Contains(body, "must contain no more than") && + !strings.Contains(body, "exceeds max") { + return false + } + return strings.Contains(body, "agent_sessions") || + strings.Contains(body, "authorization_actions") || + strings.Contains(body, "authorization_receipts") || + strings.Contains(body, "body_bytes") } func payloadLimitViolation(payload Payload, bodyBytes int) string { @@ -318,11 +382,59 @@ func advancePastMinimumBatch(statePath string, batch sqlite.LedgerBatch, reason return fmt.Errorf("managed stream advanced cursor past oversized minimum batch: %s", reason) } +func recordTerminalConfigFailure(statePath string, state State, configKey streamConfigFingerprint, err *hostedIngestError) error { + state.FailureKind = failureKindTerminalConfig + state.FailureStatus = err.StatusCode + state.FailureMessage = redactHostedErrorBody(err.Body) + state.FailureConfig = configKey + state.FailureCount++ + state.CooldownUntil = time.Now().UTC().Add(DefaultCooldown).Format(time.RFC3339Nano) + if err := SaveState(statePath, state); err != nil { + return err + } + return fmt.Errorf("managed stream entered cooldown after terminal hosted ingest failure: %w", err) +} + func saveCursor(statePath string, batch sqlite.LedgerBatch) error { - return SaveState(statePath, State{ - UpdatedAfter: batch.Cursor.UpdatedAt.UTC().Format(time.RFC3339Nano), - ActionID: batch.Cursor.ActionID, - }) + return SaveState(statePath, stateWithCursor(State{}, batch)) +} + +func stateWithCursor(state State, batch sqlite.LedgerBatch) State { + if batch.Cursor == nil { + return state + } + state.UpdatedAfter = batch.Cursor.UpdatedAt.UTC().Format(time.RFC3339Nano) + state.ActionID = batch.Cursor.ActionID + return state +} + +func cooldownActive(state State, configKey streamConfigFingerprint, now time.Time) (bool, error) { + if state.FailureKind != failureKindTerminalConfig || state.FailureConfig != configKey || state.CooldownUntil == "" { + return false, nil + } + until, err := time.Parse(time.RFC3339Nano, state.CooldownUntil) + if err != nil { + return false, err + } + return now.Before(until), nil +} + +func clearFailureState(state State) State { + return State{ + UpdatedAfter: state.UpdatedAfter, + ActionID: state.ActionID, + } +} + +func configFingerprint(opts Options) streamConfigFingerprint { + values := strings.Join([]string{ + strings.TrimSpace(opts.CloudURL), + strings.TrimSpace(opts.OrganizationID), + strings.TrimSpace(opts.InstallationID), + strings.TrimSpace(opts.InstallToken), + }, "\x00") + sum := sha256.Sum256([]byte(values)) + return streamConfigFingerprint(hex.EncodeToString(sum[:])) } func parseStateUpdatedAfter(value string) (time.Time, error) { diff --git a/internal/managedstream/stream_test.go b/internal/managedstream/stream_test.go index 4198f91f..87071b11 100644 --- a/internal/managedstream/stream_test.go +++ b/internal/managedstream/stream_test.go @@ -250,9 +250,10 @@ func TestFlushReportsHostedValidationBody(t *testing.T) { })) t.Cleanup(server.Close) + statePath := filepath.Join(t.TempDir(), "stream-state.json") err := Flush(context.Background(), Options{ DBPath: dbPath, - StatePath: filepath.Join(t.TempDir(), "stream-state.json"), + StatePath: statePath, CloudURL: server.URL, OrganizationID: "org_123", InstallationID: "ins_0123456789abcdefghijklmnopqrstuv", @@ -267,6 +268,330 @@ func TestFlushReportsHostedValidationBody(t *testing.T) { !strings.Contains(err.Error(), "authorization_actions must contain no more than 1000 records") { t.Fatalf("Flush() error = %q, want status and response body", err.Error()) } + state, err := LoadState(statePath) + if err != nil { + t.Fatalf("LoadState() error = %v", err) + } + if state.UpdatedAfter != "" || state.ActionID != "" || state.FailureKind != failureKindTerminalConfig { + t.Fatalf("state = %+v, want minimum hosted validation failure paused without cursor advance", state) + } +} + +func TestFlushOrgMismatch400EntersCooldownAndDoesNotRepost(t *testing.T) { + store, dbPath := testStore(t) + saveTestDecision(t, store, "session-1", "toolu_1") + + requests := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + requests++ + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"message":"authorization ledger organization_id does not match authenticated principal","error":"Bad Request","statusCode":400}`)) + })) + t.Cleanup(server.Close) + + statePath := filepath.Join(t.TempDir(), "stream-state.json") + opts := Options{ + DBPath: dbPath, + StatePath: statePath, + CloudURL: server.URL, + OrganizationID: "org_123", + InstallationID: "ins_0123456789abcdefghijklmnopqrstuv", + InstallToken: "test-install-token", + HTTPClient: server.Client(), + } + if err := Flush(context.Background(), opts); err == nil { + t.Fatal("Flush() error = nil, want terminal hosted failure") + } + if requests != 1 { + t.Fatalf("request count = %d, want one terminal attempt without smaller-batch retries", requests) + } + + state, err := LoadState(statePath) + if err != nil { + t.Fatalf("LoadState() error = %v", err) + } + if state.FailureKind != failureKindTerminalConfig || state.FailureStatus != http.StatusBadRequest { + t.Fatalf("state failure = kind %q status %d, want terminal config 400", state.FailureKind, state.FailureStatus) + } + if state.CooldownUntil == "" { + t.Fatalf("state = %+v, want cooldown", state) + } + if state.UpdatedAfter != "" || state.ActionID != "" { + t.Fatalf("state = %+v, want cooldown without cursor advance", state) + } + + saveTestDecision(t, store, "session-2", "toolu_2") + if err := Flush(context.Background(), opts); err == nil { + t.Fatal("Flush() error = nil, want cooldown pause diagnostic") + } + if requests != 1 { + t.Fatalf("request count = %d, want cooldown to skip hosted repost", requests) + } + state, err = LoadState(statePath) + if err != nil { + t.Fatalf("LoadState() error = %v", err) + } + if state.UpdatedAfter != "" || state.ActionID != "" { + t.Fatalf("state = %+v, want cooldown without cursor advance", state) + } +} + +func TestFlushCredentialChangeClearsTerminalCooldown(t *testing.T) { + store, dbPath := testStore(t) + saveTestDecision(t, store, "session-1", "toolu_1") + + requests := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requests++ + if r.Header.Get("Authorization") == "Bearer fixed-install-token" { + w.WriteHeader(http.StatusAccepted) + return + } + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"message":"authorization ledger organization_id does not match authenticated principal"}`)) + })) + t.Cleanup(server.Close) + + statePath := filepath.Join(t.TempDir(), "stream-state.json") + opts := Options{ + DBPath: dbPath, + StatePath: statePath, + CloudURL: server.URL, + OrganizationID: "org_123", + InstallationID: "ins_0123456789abcdefghijklmnopqrstuv", + InstallToken: "stale-install-token", + HTTPClient: server.Client(), + } + if err := Flush(context.Background(), opts); err == nil { + t.Fatal("Flush() error = nil, want terminal hosted failure") + } + + opts.InstallToken = "fixed-install-token" + if err := Flush(context.Background(), opts); err != nil { + t.Fatalf("Flush() after credential change error = %v", err) + } + if requests != 2 { + t.Fatalf("request count = %d, want retry after credential change", requests) + } + state, err := LoadState(statePath) + if err != nil { + t.Fatalf("LoadState() error = %v", err) + } + if state.FailureKind != "" || state.CooldownUntil != "" { + t.Fatalf("state = %+v, want terminal cooldown cleared after success", state) + } + if state.UpdatedAfter == "" || state.ActionID == "" { + t.Fatalf("state = %+v, want cursor advanced after success", state) + } +} + +func TestFlushPausesGeneric400WithoutAdvancingCursor(t *testing.T) { + store, dbPath := testStore(t) + saveTestDecision(t, store, "session-1", "toolu_1") + saveTestDecision(t, store, "session-2", "toolu_2") + + var actionCounts []int + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var payload Payload + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + t.Fatalf("Decode() error = %v", err) + } + actionCounts = append(actionCounts, len(payload.Actions)) + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"message":"schema validation failed"}`)) + })) + t.Cleanup(server.Close) + + statePath := filepath.Join(t.TempDir(), "stream-state.json") + opts := Options{ + DBPath: dbPath, + StatePath: statePath, + CloudURL: server.URL, + OrganizationID: "org_123", + InstallationID: "ins_0123456789abcdefghijklmnopqrstuv", + InstallToken: "test-install-token", + HTTPClient: server.Client(), + } + if err := Flush(context.Background(), opts); err == nil { + t.Fatal("Flush() error = nil, want terminal hosted failure") + } + if len(actionCounts) != 1 || actionCounts[0] <= 1 { + t.Fatalf("posted action counts = %v, want one terminal attempt without smaller-batch retries", actionCounts) + } + state, err := LoadState(statePath) + if err != nil { + t.Fatalf("LoadState() error = %v", err) + } + if state.FailureKind != failureKindTerminalConfig || state.UpdatedAfter != "" || state.ActionID != "" { + t.Fatalf("state = %+v, want terminal cooldown without cursor advance", state) + } + + firstFlushRequests := len(actionCounts) + if err := Flush(context.Background(), opts); err == nil { + t.Fatal("Flush() error = nil, want cooldown pause diagnostic") + } + if len(actionCounts) != firstFlushRequests { + t.Fatalf("posted action counts after second flush = %v, want no hosted repost during cooldown", actionCounts) + } + state, err = LoadState(statePath) + if err != nil { + t.Fatalf("LoadState() error = %v", err) + } + if state.UpdatedAfter != "" || state.ActionID != "" { + t.Fatalf("state = %+v, want cursor still paused", state) + } +} + +func TestFlushScalarSizeLike400EntersCooldownWithoutBatchRetries(t *testing.T) { + store, dbPath := testStore(t) + saveTestDecision(t, store, "session-1", "toolu_1") + saveTestDecision(t, store, "session-2", "toolu_2") + + var actionCounts []int + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var payload Payload + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + t.Fatalf("Decode() error = %v", err) + } + actionCounts = append(actionCounts, len(payload.Actions)) + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"message":"installation_id must contain no more than 64 characters"}`)) + })) + t.Cleanup(server.Close) + + statePath := filepath.Join(t.TempDir(), "stream-state.json") + err := Flush(context.Background(), Options{ + DBPath: dbPath, + StatePath: statePath, + CloudURL: server.URL, + OrganizationID: "org_123", + InstallationID: "ins_0123456789abcdefghijklmnopqrstuv", + InstallToken: "test-install-token", + HTTPClient: server.Client(), + }) + if err == nil { + t.Fatal("Flush() error = nil, want terminal hosted failure") + } + if len(actionCounts) != 1 || actionCounts[0] <= 1 { + t.Fatalf("posted action counts = %v, want one terminal attempt without smaller-batch retries", actionCounts) + } + state, err := LoadState(statePath) + if err != nil { + t.Fatalf("LoadState() error = %v", err) + } + if state.FailureKind != failureKindTerminalConfig || state.UpdatedAfter != "" || state.ActionID != "" { + t.Fatalf("state = %+v, want terminal cooldown without cursor advance", state) + } +} + +func TestFlushDoesNotAdvanceCursorForAuthFailures(t *testing.T) { + for _, status := range []int{http.StatusUnauthorized, http.StatusForbidden} { + t.Run(http.StatusText(status), func(t *testing.T) { + store, dbPath := testStore(t) + saveTestDecision(t, store, "session-1", "toolu_1") + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(status) + _, _ = w.Write([]byte(`{"message":"auth failed"}`)) + })) + t.Cleanup(server.Close) + + statePath := filepath.Join(t.TempDir(), "stream-state.json") + err := Flush(context.Background(), Options{ + DBPath: dbPath, + StatePath: statePath, + CloudURL: server.URL, + OrganizationID: "org_123", + InstallationID: "ins_0123456789abcdefghijklmnopqrstuv", + InstallToken: "test-install-token", + HTTPClient: server.Client(), + }) + if err == nil { + t.Fatalf("Flush() error = nil, want status %d failure", status) + } + if _, err := os.Stat(statePath); !os.IsNotExist(err) { + t.Fatalf("state file error = %v, want not exist", err) + } + }) + } +} + +func TestFlushDoesNotAdvanceCursorForUnprocessableValidation(t *testing.T) { + store, dbPath := testStore(t) + saveTestDecision(t, store, "session-1", "toolu_1") + + requests := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + requests++ + w.WriteHeader(http.StatusUnprocessableEntity) + _, _ = w.Write([]byte(`{"message":"installation_id is invalid"}`)) + })) + t.Cleanup(server.Close) + + statePath := filepath.Join(t.TempDir(), "stream-state.json") + err := Flush(context.Background(), Options{ + DBPath: dbPath, + StatePath: statePath, + CloudURL: server.URL, + OrganizationID: "org_123", + InstallationID: "ins_0123456789abcdefghijklmnopqrstuv", + InstallToken: "test-install-token", + HTTPClient: server.Client(), + }) + if err == nil { + t.Fatal("Flush() error = nil, want terminal hosted failure") + } + if requests != 1 { + t.Fatalf("request count = %d, want one terminal attempt", requests) + } + state, err := LoadState(statePath) + if err != nil { + t.Fatalf("LoadState() error = %v", err) + } + if state.UpdatedAfter != "" || state.ActionID != "" || state.FailureKind != failureKindTerminalConfig { + t.Fatalf("state = %+v, want terminal cooldown without cursor advance", state) + } +} + +func TestFlushReportsMalformedCooldownState(t *testing.T) { + store, dbPath := testStore(t) + saveTestDecision(t, store, "session-1", "toolu_1") + + requests := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + requests++ + w.WriteHeader(http.StatusAccepted) + })) + t.Cleanup(server.Close) + + statePath := filepath.Join(t.TempDir(), "stream-state.json") + opts := Options{ + DBPath: dbPath, + StatePath: statePath, + CloudURL: server.URL, + OrganizationID: "org_123", + InstallationID: "ins_0123456789abcdefghijklmnopqrstuv", + InstallToken: "test-install-token", + HTTPClient: server.Client(), + } + if err := SaveState(statePath, State{ + FailureKind: failureKindTerminalConfig, + FailureConfig: configFingerprint(opts), + CooldownUntil: "not-a-timestamp", + }); err != nil { + t.Fatalf("SaveState() error = %v", err) + } + + err := Flush(context.Background(), opts) + if err == nil { + t.Fatal("Flush() error = nil, want malformed cooldown state") + } + if !strings.Contains(err.Error(), "parse managed stream cooldown state") { + t.Fatalf("Flush() error = %q, want malformed cooldown state", err.Error()) + } + if requests != 0 { + t.Fatalf("request count = %d, want no hosted post with malformed cooldown state", requests) + } } func TestFlushRedactsHostedValidationBody(t *testing.T) { @@ -310,7 +635,7 @@ func TestFlushRedactsHostedValidationBody(t *testing.T) { } } -func TestFlushDoesNotAdvanceCursorPastHostedRejectedMinimumBatch(t *testing.T) { +func TestFlushDoesNotAdvanceCursorPastGenericHostedRejectedMinimumBatch(t *testing.T) { store, dbPath := testStore(t) saveTestDecision(t, store, "session-1", "toolu_1") @@ -334,8 +659,15 @@ func TestFlushDoesNotAdvanceCursorPastHostedRejectedMinimumBatch(t *testing.T) { if err == nil { t.Fatal("Flush() error = nil, want hosted validation failure") } - if _, err := os.Stat(statePath); !os.IsNotExist(err) { - t.Fatalf("state file error = %v, want not exist", err) + if !strings.Contains(err.Error(), "entered cooldown after terminal hosted ingest failure") { + t.Fatalf("Flush() error = %q, want terminal cooldown diagnostic", err.Error()) + } + state, err := LoadState(statePath) + if err != nil { + t.Fatalf("LoadState() error = %v", err) + } + if state.UpdatedAfter != "" || state.ActionID != "" || state.FailureKind != failureKindTerminalConfig { + t.Fatalf("state = %+v, want terminal cooldown without cursor advance", state) } } @@ -457,6 +789,60 @@ func TestParseStateUpdatedAfterAcceptsLegacyTimestampFormats(t *testing.T) { } } +func TestStateJSONBoundaryPreservesCursorOnlyAndFailureShape(t *testing.T) { + statePath := filepath.Join(t.TempDir(), "stream-state.json") + if err := os.WriteFile(statePath, []byte(`{"updated_after":"2026-06-08T12:20:07.853885Z","action_id":"action-123"}`), 0o600); err != nil { + t.Fatalf("WriteFile() error = %v", err) + } + + state, err := LoadState(statePath) + if err != nil { + t.Fatalf("LoadState() error = %v", err) + } + if state.UpdatedAfter != "2026-06-08T12:20:07.853885Z" || state.ActionID != "action-123" { + t.Fatalf("state = %+v, want cursor-only persisted shape", state) + } + if state.FailureKind != "" || state.FailureConfig != "" { + t.Fatalf("state = %+v, want no failure state from cursor-only shape", state) + } + + state.FailureKind = failureKindTerminalConfig + state.FailureStatus = http.StatusBadRequest + state.FailureMessage = "organization mismatch" + state.FailureConfig = streamConfigFingerprint("config-key") + state.FailureCount = 2 + state.CooldownUntil = "2026-06-08T12:35:07.853885Z" + if err := SaveState(statePath, state); err != nil { + t.Fatalf("SaveState() error = %v", err) + } + + data, err := os.ReadFile(statePath) + if err != nil { + t.Fatalf("ReadFile() error = %v", err) + } + var persisted struct { + UpdatedAfter string `json:"updated_after"` + ActionID string `json:"action_id"` + FailureKind string `json:"failure_kind"` + FailureStatus int `json:"failure_status"` + FailureMessage string `json:"failure_message"` + FailureConfig string `json:"failure_config"` + FailureCount int `json:"failure_count"` + CooldownUntil string `json:"cooldown_until"` + } + if err := json.Unmarshal(data, &persisted); err != nil { + t.Fatalf("Unmarshal() error = %v", err) + } + if persisted.FailureKind != "terminal_config" || persisted.FailureConfig != "config-key" { + t.Fatalf("persisted state = %+v, want string failure fields", persisted) + } + if persisted.UpdatedAfter != state.UpdatedAfter || persisted.ActionID != state.ActionID || + persisted.FailureStatus != state.FailureStatus || persisted.FailureMessage != state.FailureMessage || + persisted.FailureCount != state.FailureCount || persisted.CooldownUntil != state.CooldownUntil { + t.Fatalf("persisted state = %+v, want state fields preserved", persisted) + } +} + func testStore(t *testing.T) (*sqlite.Store, string) { t.Helper() dbPath := filepath.Join(t.TempDir(), "guard.db")